aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-02-29 13:01:27 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-02-29 13:01:27 -0800
commitc7fccb56cd9260b8d72572e65f8e46a14707b9a5 (patch)
treee96053bc0d74ae33b411817268b685e995b24e7a /yarn
parent4bd697da03079c26fd4409dc128dbff28c737701 (diff)
downloadspark-c7fccb56cd9260b8d72572e65f8e46a14707b9a5.tar.gz
spark-c7fccb56cd9260b8d72572e65f8e46a14707b9a5.tar.bz2
spark-c7fccb56cd9260b8d72572e65f8e46a14707b9a5.zip
[SPARK-13478][YARN] Use real user when fetching delegation tokens.
The Hive client library is not smart enough to notice that the current user is a proxy user; so when using a proxy user, it fails to fetch delegation tokens from the metastore because of a missing kerberos TGT for the current user. To fix it, just run the code that fetches the delegation token as the real logged in user. Tested on a kerberos cluster both submitting normally and with a proxy user; Hive and HBase tokens are retrieved correctly in both cases. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11358 from vanzin/SPARK-13478.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala46
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala2
2 files changed, 36 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4c9432dbd6..aef78fdfd4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,7 +18,9 @@
package org.apache.spark.deploy.yarn
import java.io.File
+import java.lang.reflect.UndeclaredThrowableException
import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -194,7 +196,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
*/
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
try {
- obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
+ obtainTokenForHiveMetastoreInner(conf)
} catch {
case e: ClassNotFoundException =>
logInfo(s"Hive class not found $e")
@@ -209,8 +211,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
* @param username the username of the principal requesting the delegating token.
* @return a delegation token
*/
- private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
- username: String): Option[Token[DelegationTokenIdentifier]] = {
+ private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
+ Option[Token[DelegationTokenIdentifier]] = {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -225,11 +227,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Check for local metastore
if (metastoreUri.nonEmpty) {
- require(username.nonEmpty, "Username undefined")
val principalKey = "hive.metastore.kerberos.principal"
val principal = hiveConf.getTrimmed(principalKey, "")
require(principal.nonEmpty, "Hive principal $principalKey undefined")
- logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
+ val currentUser = UserGroupInformation.getCurrentUser()
+ logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
+ s"$principal at $metastoreUri")
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
val closeCurrent = hiveClass.getMethod("closeCurrent")
try {
@@ -238,12 +241,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
classOf[String], classOf[String])
val getHive = hiveClass.getMethod("get", hiveConfClass)
- // invoke
- val hive = getHive.invoke(null, hiveConf)
- val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
- val hive2Token = new Token[DelegationTokenIdentifier]()
- hive2Token.decodeFromUrlString(tokenStr)
- Some(hive2Token)
+ doAsRealUser {
+ val hive = getHive.invoke(null, hiveConf)
+ val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
+ .asInstanceOf[String]
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ Some(hive2Token)
+ }
} finally {
Utils.tryLogNonFatalError {
closeCurrent.invoke(null)
@@ -303,6 +308,25 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
}
+ /**
+ * Run some code as the real logged in user (which may differ from the current user, for
+ * example, when using proxying).
+ */
+ private def doAsRealUser[T](fn: => T): T = {
+ val currentUser = UserGroupInformation.getCurrentUser()
+ val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
+
+ // For some reason the Scala-generated anonymous class ends up causing an
+ // UndeclaredThrowableException, even if you annotate the method with @throws.
+ try {
+ realUser.doAs(new PrivilegedExceptionAction[T]() {
+ override def run(): T = fn
+ })
+ } catch {
+ case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
+ }
+ }
+
}
object YarnSparkHadoopUtil {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index d3acaf229c..9202bd892f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -255,7 +255,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
val util = new YarnSparkHadoopUtil
assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
+ util.obtainTokenForHiveMetastoreInner(hadoopConf)
})
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastore(hadoopConf)