aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-08-10 15:39:30 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-10 15:39:30 -0700
commitab648c0004cfb20d53554ab333dd2d198cb94ffa (patch)
tree74fa18e0a21caedaca6eda3557d60c9bd3af07b0 /yarn/src/test/scala/org
parentbd2c12fb4994785d5becce541aee9ba73fef1c4c (diff)
downloadspark-ab648c0004cfb20d53554ab333dd2d198cb94ffa.tar.gz
spark-ab648c0004cfb20d53554ab333dd2d198cb94ffa.tar.bz2
spark-ab648c0004cfb20d53554ab333dd2d198cb94ffa.zip
[SPARK-14743][YARN] Add a configurable credential manager for Spark running on YARN
## What changes were proposed in this pull request? Add a configurable token manager for Spark on running on yarn. ### Current Problems ### 1. Supported token provider is hard-coded, currently only hdfs, hbase and hive are supported and it is impossible for user to add new token provider without code changes. 2. Also this problem exits in timely token renewer and updater. ### Changes In This Proposal ### In this proposal, to address the problems mentioned above and make the current code more cleaner and easier to understand, mainly has 3 changes: 1. Abstract a `ServiceTokenProvider` as well as `ServiceTokenRenewable` interface for token provider. Each service wants to communicate with Spark through token way needs to implement this interface. 2. Provide a `ConfigurableTokenManager` to manage all the register token providers, also token renewer and updater. Also this class offers the API for other modules to obtain tokens, get renewal interval and so on. 3. Implement 3 built-in token providers `HDFSTokenProvider`, `HiveTokenProvider` and `HBaseTokenProvider` to keep the same semantics as supported today. Whether to load in these built-in token providers is controlled by configuration "spark.yarn.security.tokens.${service}.enabled", by default for all the built-in token providers are loaded. ### Behavior Changes ### For the end user there's no behavior change, we still use the same configuration `spark.yarn.security.tokens.${service}.enabled` to decide which token provider is enabled (hbase or hive). For user implemented token provider (assume the name of token provider is "test") needs to add into this class should have two configurations: 1. `spark.yarn.security.tokens.test.enabled` to true 2. `spark.yarn.security.tokens.test.class` to the full qualified class name. So we still keep the same semantics as current code while add one new configuration. ### Current Status ### - [x] token provider interface and management framework. - [x] implement built-in token providers (hdfs, hbase, hive). - [x] Coverage of unit test. - [x] Integrated test with security cluster. ## How was this patch tested? Unit test and integrated test. Please suggest and review, any comment is greatly appreciated. Author: jerryshao <sshao@hortonworks.com> Closes #14065 from jerryshao/SPARK-16342.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala97
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala150
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala71
3 files changed, 222 insertions, 96 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index fe09808ae5..7fbbe12609 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -18,13 +18,9 @@
package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
-import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets
import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.io.Text
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -32,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
-import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -173,64 +169,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
- test("check access nns empty") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns unset") {
- val sparkConf = new SparkConf()
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set())
- }
-
- test("check access nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access nns space") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032")))
- }
-
- test("check access two nns") {
- val sparkConf = new SparkConf()
- sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val util = new YarnSparkHadoopUtil
- val nns = util.getNameNodesToAccess(sparkConf)
- nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val util = new YarnSparkHadoopUtil
- val renewer = util.getTokenRenewer(hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val util = new YarnSparkHadoopUtil
- val caught =
- intercept[SparkException] {
- util.getTokenRenewer(hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-
test("check different hadoop utils based on env variable") {
try {
System.setProperty("SPARK_YARN_MODE", "true")
@@ -242,40 +180,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
- test("Obtain tokens For HiveMetastore") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hive.metastore.kerberos.principal", "bob")
- // thrift picks up on port 0 and bails out, without trying to talk to endpoint
- hadoopConf.set("hive.metastore.uris", "http://localhost:0")
- val util = new YarnSparkHadoopUtil
- assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastoreInner(hadoopConf)
- })
- assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastore(hadoopConf)
- })
- }
- private def assertNestedHiveException(e: InvocationTargetException): Throwable = {
- val inner = e.getCause
- if (inner == null) {
- fail("No inner cause", e)
- }
- if (!inner.isInstanceOf[HiveException]) {
- fail("Not a hive exception", inner)
- }
- inner
- }
-
- test("Obtain tokens For HBase") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hbase.security.authentication", "kerberos")
- val util = new YarnSparkHadoopUtil
- intercept[ClassNotFoundException] {
- util.obtainTokenForHBaseInner(hadoopConf)
- }
- util.obtainTokenForHBase(hadoopConf) should be (None)
- }
// This test needs to live here because it depends on isYarnMode returning true, which can only
// happen in the YARN module.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
new file mode 100644
index 0000000000..db4619e80c
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.Token
+import org.scalatest.{BeforeAndAfter, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
+
+class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+ private var credentialManager: ConfigurableCredentialManager = null
+ private var sparkConf: SparkConf = null
+ private var hadoopConf: Configuration = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sparkConf = new SparkConf()
+ hadoopConf = new Configuration()
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
+
+ override def afterAll(): Unit = {
+ System.clearProperty("SPARK_YARN_MODE")
+
+ super.afterAll()
+ }
+
+ test("Correctly load default credential providers") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ credentialManager.getServiceCredentialProvider("hive") should not be (None)
+ }
+
+ test("disable hive credential provider") {
+ sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ credentialManager.getServiceCredentialProvider("hive") should be (None)
+ }
+
+ test("using deprecated configurations") {
+ sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+ sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+ credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+ credentialManager.getServiceCredentialProvider("hive") should be (None)
+ credentialManager.getServiceCredentialProvider("test") should not be (None)
+ credentialManager.getServiceCredentialProvider("hbase") should not be (None)
+ }
+
+ test("verify obtaining credentials from provider") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ val creds = new Credentials()
+
+ // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot
+ // be obtained.
+ credentialManager.obtainCredentials(hadoopConf, creds)
+ val tokens = creds.getAllTokens
+ tokens.size() should be (1)
+ tokens.iterator().next().getService should be (new Text("test"))
+ }
+
+ test("verify getting credential renewal info") {
+ credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ val creds = new Credentials()
+
+ val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get
+ .asInstanceOf[TestCredentialProvider]
+ // Only TestTokenProvider can get the time of next token renewal
+ val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
+ nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
+ }
+
+ test("obtain tokens For HiveMetastore") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+ // thrift picks up on port 0 and bails out, without trying to talk to endpoint
+ hadoopConf.set("hive.metastore.uris", "http://localhost:0")
+
+ val hiveCredentialProvider = new HiveCredentialProvider()
+ val credentials = new Credentials()
+ hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials)
+
+ credentials.getAllTokens.size() should be (0)
+ }
+
+ test("Obtain tokens For HBase") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hbase.security.authentication", "kerberos")
+
+ val hbaseTokenProvider = new HBaseCredentialProvider()
+ val creds = new Credentials()
+ hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
+
+ creds.getAllTokens.size should be (0)
+ }
+}
+
+class TestCredentialProvider extends ServiceCredentialProvider {
+ val tokenRenewalInterval = 86400 * 1000L
+ var timeOfNextTokenRenewal = 0L
+
+ override def serviceName: String = "test"
+
+ override def credentialsRequired(conf: Configuration): Boolean = true
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ if (creds == null) {
+ // Guard out other unit test failures.
+ return None
+ }
+
+ val emptyToken = new Token()
+ emptyToken.setService(new Text("test"))
+ creds.addToken(emptyToken.getService, emptyToken)
+
+ val currTime = System.currentTimeMillis()
+ timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval
+
+ Some(timeOfNextTokenRenewal)
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
new file mode 100644
index 0000000000..7b2da3f26e
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Matchers, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class HDFSCredentialProviderSuite
+ extends SparkFunSuite
+ with PrivateMethodTester
+ with Matchers {
+ private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
+
+ private def getTokenRenewer(
+ hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
+ hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+ }
+
+ private var hdfsCredentialProvider: HDFSCredentialProvider = null
+
+ override def beforeAll() {
+ super.beforeAll()
+
+ if (hdfsCredentialProvider == null) {
+ hdfsCredentialProvider = new HDFSCredentialProvider()
+ }
+ }
+
+ override def afterAll() {
+ if (hdfsCredentialProvider != null) {
+ hdfsCredentialProvider = null
+ }
+
+ super.afterAll()
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val caught =
+ intercept[SparkException] {
+ getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
+}