aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src/test')
-rw-r--r--yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider1
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala97
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala150
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala71
4 files changed, 223 insertions, 96 deletions
diff --git a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000000..d0ef5efa36
--- /dev/null
+++ b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1 @@
+org.apache.spark.deploy.yarn.security.TestCredentialProvider
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index fe09808ae5..7fbbe12609 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -18,13 +18,9 @@
package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
-import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets
import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.io.Text
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -32,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
-import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -173,64 +169,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val util = new YarnSparkHadoopUtil
- val renewer = util.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val util = new YarnSparkHadoopUtil
- val caught =
- intercept[SparkException] {
- util.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-
test("check different hadoop utils based on env variable") {
try {
System.setProperty("SPARK_YARN_MODE", "true")
@@ -242,40 +180,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
- test("Obtain tokens For HiveMetastore") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hive.metastore.kerberos.principal", "bob")
- // thrift picks up on port 0 and bails out, without trying to talk to endpoint
- hadoopConf.set("hive.metastore.uris", "http://localhost:0")
- val util = new YarnSparkHadoopUtil
- assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastoreInner(hadoopConf)
- })
- assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastore(hadoopConf)
- })
- }
- private def assertNestedHiveException(e: InvocationTargetException): Throwable = {
- val inner = e.getCause
- if (inner == null) {
- fail("No inner cause", e)
- }
- if (!inner.isInstanceOf[HiveException]) {
- fail("Not a hive exception", inner)
- }
- inner
- }
-
- test("Obtain tokens For HBase") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hbase.security.authentication", "kerberos")
- val util = new YarnSparkHadoopUtil
- intercept[ClassNotFoundException] {
- util.obtainTokenForHBaseInner(hadoopConf)
- }
- util.obtainTokenForHBase(hadoopConf) should be (None)
- }
// This test needs to live here because it depends on isYarnMode returning true, which can only
// happen in the YARN module.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
new file mode 100644
index 0000000000..db4619e80c
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.Token
+import org.scalatest.{BeforeAndAfter, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
+
+class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+ private var credentialManager: ConfigurableCredentialManager = null
+ private var sparkConf: SparkConf = null
+ private var hadoopConf: Configuration = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sparkConf = new SparkConf()
+ hadoopConf = new Configuration()
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
+
+ override def afterAll(): Unit = {
+ System.clearProperty("SPARK_YARN_MODE")
+
+ super.afterAll()
+ }
+
+ test("Correctly load default credential providers") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ credentialManager.getServiceCredentialProvider("hive") should not be (None)
+ }
+
+ test("disable hive credential provider") {
+ sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ credentialManager.getServiceCredentialProvider("hive") should be (None)
+ }
+
+ test("using deprecated configurations") {
+ sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+ sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+ credentialManager.getServiceCredentialProvider("hive") should be (None)
+ credentialManager.getServiceCredentialProvider("test") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ }
+
+ test("verify obtaining credentials from provider") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ val creds = new Credentials()
+
+ // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot
+ // be obtained.
+ credentialManager.obtainCredentials(hadoopConf, creds)
+ val tokens = creds.getAllTokens
+ tokens.size() should be (1)
+ tokens.iterator().next().getService should be (new Text("test"))
+ }
+
+ test("verify getting credential renewal info") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ val creds = new Credentials()
+
+ val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get
+ .asInstanceOf[TestCredentialProvider]
+ // Only TestTokenProvider can get the time of next token renewal
+ val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
+ nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
+ }
+
+ test("obtain tokens For HiveMetastore") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+ // thrift picks up on port 0 and bails out, without trying to talk to endpoint
+ hadoopConf.set("hive.metastore.uris", "http://localhost:0")
+
+ val hiveCredentialProvider = new HiveCredentialProvider()
+ val credentials = new Credentials()
+ hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials)
+
+ credentials.getAllTokens.size() should be (0)
+ }
+
+ test("Obtain tokens For HBase") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hbase.security.authentication", "kerberos")
+
+ val hbaseTokenProvider = new HBaseCredentialProvider()
+ val creds = new Credentials()
+ hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
+
+ creds.getAllTokens.size should be (0)
+ }
+}
+
+class TestCredentialProvider extends ServiceCredentialProvider {
+ val tokenRenewalInterval = 86400 * 1000L
+ var timeOfNextTokenRenewal = 0L
+
+ override def serviceName: String = "test"
+
+ override def credentialsRequired(conf: Configuration): Boolean = true
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ if (creds == null) {
+ // Guard out other unit test failures.
+ return None
+ }
+
+ val emptyToken = new Token()
+ emptyToken.setService(new Text("test"))
+ creds.addToken(emptyToken.getService, emptyToken)
+
+ val currTime = System.currentTimeMillis()
+ timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval
+
+ Some(timeOfNextTokenRenewal)
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
new file mode 100644
index 0000000000..7b2da3f26e
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Matchers, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class HDFSCredentialProviderSuite
+ extends SparkFunSuite
+ with PrivateMethodTester
+ with Matchers {
+ private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
+
+ private def getTokenRenewer(
+ hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
+ hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+ }
+
+ private var hdfsCredentialProvider: HDFSCredentialProvider = null
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ if (hdfsCredentialProvider == null) {
+ hdfsCredentialProvider = new HDFSCredentialProvider()
+ }
+ }
+
+ override def afterAll() {
+ if (hdfsCredentialProvider != null) {
+ hdfsCredentialProvider = null
+ }
+
+ super.afterAll()
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val caught =
+ intercept[SparkException] {
+ getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
+}