From c7fccb56cd9260b8d72572e65f8e46a14707b9a5 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 29 Feb 2016 13:01:27 -0800 Subject: [SPARK-13478][YARN] Use real user when fetching delegation tokens. The Hive client library is not smart enough to notice that the current user is a proxy user; so when using a proxy user, it fails to fetch delegation tokens from the metastore because of a missing kerberos TGT for the current user. To fix it, just run the code that fetches the delegation token as the real logged in user. Tested on a kerberos cluster both submitting normally and with a proxy user; Hive and HBase tokens are retrieved correctly in both cases. Author: Marcelo Vanzin Closes #11358 from vanzin/SPARK-13478. --- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 46 ++++++++++++++++------ .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 2 +- 2 files changed, 36 insertions(+), 12 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4c9432dbd6..aef78fdfd4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -18,7 +18,9 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.lang.reflect.UndeclaredThrowableException import java.nio.charset.StandardCharsets.UTF_8 +import java.security.PrivilegedExceptionAction import java.util.regex.Matcher import java.util.regex.Pattern @@ -194,7 +196,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { */ def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = { try { - obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName) + obtainTokenForHiveMetastoreInner(conf) } catch { case e: ClassNotFoundException => logInfo(s"Hive class not found $e") @@ -209,8 +211,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { * @param username the username of the principal requesting the delegating token. * @return a delegation token */ - private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration, - username: String): Option[Token[DelegationTokenIdentifier]] = { + private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration): + Option[Token[DelegationTokenIdentifier]] = { val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down @@ -225,11 +227,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // Check for local metastore if (metastoreUri.nonEmpty) { - require(username.nonEmpty, "Username undefined") val principalKey = "hive.metastore.kerberos.principal" val principal = hiveConf.getTrimmed(principalKey, "") require(principal.nonEmpty, "Hive principal $principalKey undefined") - logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri") + val currentUser = UserGroupInformation.getCurrentUser() + logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + + s"$principal at $metastoreUri") val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") val closeCurrent = hiveClass.getMethod("closeCurrent") try { @@ -238,12 +241,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { classOf[String], classOf[String]) val getHive = hiveClass.getMethod("get", hiveConfClass) - // invoke - val hive = getHive.invoke(null, hiveConf) - val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String] - val hive2Token = new Token[DelegationTokenIdentifier]() - hive2Token.decodeFromUrlString(tokenStr) - Some(hive2Token) + doAsRealUser { + val hive = getHive.invoke(null, hiveConf) + val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal) + .asInstanceOf[String] + val hive2Token = new Token[DelegationTokenIdentifier]() + hive2Token.decodeFromUrlString(tokenStr) + Some(hive2Token) + } } finally { Utils.tryLogNonFatalError { closeCurrent.invoke(null) @@ -303,6 +308,25 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } } + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + private def doAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) + } + } + } object YarnSparkHadoopUtil { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index d3acaf229c..9202bd892f 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -255,7 +255,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging hadoopConf.set("hive.metastore.uris", "http://localhost:0") val util = new YarnSparkHadoopUtil assertNestedHiveException(intercept[InvocationTargetException] { - util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice") + util.obtainTokenForHiveMetastoreInner(hadoopConf) }) assertNestedHiveException(intercept[InvocationTargetException] { util.obtainTokenForHiveMetastore(hadoopConf) -- cgit v1.2.3