aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSteve Loughran <stevel@hortonworks.com>2015-12-09 10:25:38 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-09 10:25:38 -0800
commit442a7715a590ba2ea2446c73b1f914a16ae0ed4b (patch)
tree68e4cd172f2826abfae7eefeb57e912622077884 /yarn
parent6900f0173790ad2fa4c79a426bd2dec2d149daa2 (diff)
downloadspark-442a7715a590ba2ea2446c73b1f914a16ae0ed4b.tar.gz
spark-442a7715a590ba2ea2446c73b1f914a16ae0ed4b.tar.bz2
spark-442a7715a590ba2ea2446c73b1f914a16ae0ed4b.zip
[SPARK-12241][YARN] Improve failure reporting in Yarn client obtainTokenForHBase()
This lines up the HBase token logic with that done for Hive in SPARK-11265: reflection with only CFNE being swallowed. There is a test, one which doesn't try to put HBase on the yarn/test class and really do the reflection (the way the hive introspection does). If people do want that then it could be added with careful POM work +also: cut an incorrect comment from the Hive test case before copying it, and a couple of imports that may have been related to the hive test in the past. Author: Steve Loughran <stevel@hortonworks.com> Closes #10227 from steveloughran/stevel/patches/SPARK-12241-obtainTokenForHBase.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala32
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala51
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala12
3 files changed, 64 insertions, 31 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f0590d2d22..7742ec92eb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1369,40 +1369,16 @@ object Client extends Logging {
}
/**
- * Obtain security token for HBase.
+ * Obtain a security token for HBase.
*/
def obtainTokenForHBase(
sparkConf: SparkConf,
conf: Configuration,
credentials: Credentials): Unit = {
if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
- val mirror = universe.runtimeMirror(getClass.getClassLoader)
-
- try {
- val confCreate = mirror.classLoader.
- loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
- getMethod("create", classOf[Configuration])
- val obtainToken = mirror.classLoader.
- loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
- getMethod("obtainToken", classOf[Configuration])
-
- logDebug("Attempting to fetch HBase security token.")
-
- val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
- if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
- val token = obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]]
- credentials.addToken(token.getService, token)
- logInfo("Added HBase security token to credentials.")
- }
- } catch {
- case e: java.lang.NoSuchMethodException =>
- logInfo("HBase Method not found: " + e)
- case e: java.lang.ClassNotFoundException =>
- logDebug("HBase Class not found: " + e)
- case e: java.lang.NoClassDefFoundError =>
- logDebug("HBase Class not found: " + e)
- case e: Exception =>
- logError("Exception when obtaining HBase security token: " + e)
+ YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
+ credentials.addToken(token.getService, token)
+ logInfo("Added HBase security token to credentials.")
}
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a290ebeec9..36a2d61429 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.Token
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -216,6 +216,55 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
None
}
}
+
+ /**
+ * Obtain a security token for HBase.
+ *
+ * Requirements
+ *
+ * 1. `"hbase.security.authentication" == "kerberos"`
+ * 2. The HBase classes `HBaseConfiguration` and `TokenUtil` could be loaded
+ * and invoked.
+ *
+ * @param conf Hadoop configuration; an HBase configuration is created
+ * from this.
+ * @return a token if the requirements were met, `None` if not.
+ */
+ def obtainTokenForHBase(conf: Configuration): Option[Token[TokenIdentifier]] = {
+ try {
+ obtainTokenForHBaseInner(conf)
+ } catch {
+ case e: ClassNotFoundException =>
+ logInfo(s"HBase class not found $e")
+ logDebug("HBase class not found", e)
+ None
+ }
+ }
+
+ /**
+ * Obtain a security token for HBase if `"hbase.security.authentication" == "kerberos"`
+ *
+ * @param conf Hadoop configuration; an HBase configuration is created
+ * from this.
+ * @return a token if one was needed
+ */
+ def obtainTokenForHBaseInner(conf: Configuration): Option[Token[TokenIdentifier]] = {
+ val mirror = universe.runtimeMirror(getClass.getClassLoader)
+ val confCreate = mirror.classLoader.
+ loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
+ getMethod("create", classOf[Configuration])
+ val obtainToken = mirror.classLoader.
+ loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
+ getMethod("obtainToken", classOf[Configuration])
+ val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
+ if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
+ logDebug("Attempting to fetch HBase security token.")
+ Some(obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]])
+ } else {
+ None
+ }
+ }
+
}
object YarnSparkHadoopUtil {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index a70e66d39a..3fafc91a16 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.io.Text
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
@@ -259,7 +258,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
})
- // expect exception trapping code to unwind this hive-side exception
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastore(hadoopConf)
})
@@ -276,6 +274,16 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
inner
}
+ test("Obtain tokens For HBase") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hbase.security.authentication", "kerberos")
+ val util = new YarnSparkHadoopUtil
+ intercept[ClassNotFoundException] {
+ util.obtainTokenForHBaseInner(hadoopConf)
+ }
+ util.obtainTokenForHBase(hadoopConf) should be (None)
+ }
+
// This test needs to live here because it depends on isYarnMode returning true, which can only
// happen in the YARN module.
test("security manager token generation") {