aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala46
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala2
2 files changed, 36 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4c9432dbd6..aef78fdfd4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,7 +18,9 @@
package org.apache.spark.deploy.yarn
import java.io.File
+import java.lang.reflect.UndeclaredThrowableException
import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -194,7 +196,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
*/
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
try {
- obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
+ obtainTokenForHiveMetastoreInner(conf)
} catch {
case e: ClassNotFoundException =>
logInfo(s"Hive class not found $e")
@@ -209,8 +211,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
* @param username the username of the principal requesting the delegating token.
* @return a delegation token
*/
- private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
- username: String): Option[Token[DelegationTokenIdentifier]] = {
+ private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
+ Option[Token[DelegationTokenIdentifier]] = {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -225,11 +227,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Check for local metastore
if (metastoreUri.nonEmpty) {
- require(username.nonEmpty, "Username undefined")
val principalKey = "hive.metastore.kerberos.principal"
val principal = hiveConf.getTrimmed(principalKey, "")
require(principal.nonEmpty, "Hive principal $principalKey undefined")
- logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
+ val currentUser = UserGroupInformation.getCurrentUser()
+ logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
+ s"$principal at $metastoreUri")
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
val closeCurrent = hiveClass.getMethod("closeCurrent")
try {
@@ -238,12 +241,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
classOf[String], classOf[String])
val getHive = hiveClass.getMethod("get", hiveConfClass)
- // invoke
- val hive = getHive.invoke(null, hiveConf)
- val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
- val hive2Token = new Token[DelegationTokenIdentifier]()
- hive2Token.decodeFromUrlString(tokenStr)
- Some(hive2Token)
+ doAsRealUser {
+ val hive = getHive.invoke(null, hiveConf)
+ val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
+ .asInstanceOf[String]
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ Some(hive2Token)
+ }
} finally {
Utils.tryLogNonFatalError {
closeCurrent.invoke(null)
@@ -303,6 +308,25 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
}
+ /**
+ * Run some code as the real logged in user (which may differ from the current user, for
+ * example, when using proxying).
+ */
+ private def doAsRealUser[T](fn: => T): T = {
+ val currentUser = UserGroupInformation.getCurrentUser()
+ val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
+
+ // For some reason the Scala-generated anonymous class ends up causing an
+ // UndeclaredThrowableException, even if you annotate the method with @throws.
+ try {
+ realUser.doAs(new PrivilegedExceptionAction[T]() {
+ override def run(): T = fn
+ })
+ } catch {
+ case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
+ }
+ }
+
}
object YarnSparkHadoopUtil {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index d3acaf229c..9202bd892f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -255,7 +255,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
val util = new YarnSparkHadoopUtil
assertNestedHiveException(intercept[InvocationTargetException] {
- util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
+ util.obtainTokenForHiveMetastoreInner(hadoopConf)
})
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastore(hadoopConf)