aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSteve Loughran <stevel@hortonworks.com>2015-10-31 18:23:15 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-10-31 18:23:15 -0700
commit40d3c6797a3dfd037eb69b2bcd336d8544deddf5 (patch)
tree39243aae057c6d6314861579e536773e952cf6a9 /yarn
parentfc27dfbf0f8d3f96c70e27d88f7d0316c97ddb1e (diff)
downloadspark-40d3c6797a3dfd037eb69b2bcd336d8544deddf5.tar.gz
spark-40d3c6797a3dfd037eb69b2bcd336d8544deddf5.tar.bz2
spark-40d3c6797a3dfd037eb69b2bcd336d8544deddf5.zip
[SPARK-11265][YARN] YarnClient can't get tokens to talk to Hive 1.2.1 in a secure cluster
This is a fix for SPARK-11265; the introspection code to get Hive delegation tokens failing on Spark 1.5.1+, due to changes in the Hive codebase Author: Steve Loughran <stevel@hortonworks.com> Closes #9232 from steveloughran/stevel/patches/SPARK-11265-hive-tokens.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/pom.xml25
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala51
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala73
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala29
4 files changed, 129 insertions, 49 deletions
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 3eadacba13..989b820bec 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -162,6 +162,31 @@
<artifactId>jersey-server</artifactId>
<scope>test</scope>
</dependency>
+
+ <!--
+ Testing Hive reflection needs hive on the test classpath only.
+ It doesn't need the spark hive modules, so the -Phive flag is not checked.
+ -->
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-exec</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4954b61809..a3f33d8018 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1337,55 +1337,8 @@ object Client extends Logging {
conf: Configuration,
credentials: Credentials) {
if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
- val mirror = universe.runtimeMirror(getClass.getClassLoader)
-
- try {
- val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
- val hiveConf = hiveConfClass.newInstance()
-
- val hiveConfGet = (param: String) => Option(hiveConfClass
- .getMethod("get", classOf[java.lang.String])
- .invoke(hiveConf, param))
-
- val metastore_uri = hiveConfGet("hive.metastore.uris")
-
- // Check for local metastore
- if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
- val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
- val hive = hiveClass.getMethod("get").invoke(null, hiveConf.asInstanceOf[Object])
-
- val metastore_kerberos_principal_conf_var = mirror.classLoader
- .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
- .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
-
- val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
-
- val username = Option(UserGroupInformation.getCurrentUser().getUserName)
- if (principal != None && username != None) {
- val tokenStr = hiveClass.getMethod("getDelegationToken",
- classOf[java.lang.String], classOf[java.lang.String])
- .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]
-
- val hive2Token = new Token[DelegationTokenIdentifier]()
- hive2Token.decodeFromUrlString(tokenStr)
- credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
- logDebug("Added hive.Server2.delegation.token to conf.")
- hiveClass.getMethod("closeCurrent").invoke(null)
- } else {
- logError("Username or principal == NULL")
- logError(s"""username=${username.getOrElse("(NULL)")}""")
- logError(s"""principal=${principal.getOrElse("(NULL)")}""")
- throw new IllegalArgumentException("username and/or principal is equal to null!")
- }
- } else {
- logDebug("HiveMetaStore configured in localmode")
- }
- } catch {
- case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
- case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
- case e: Exception => { logError("Unexpected Exception " + e)
- throw new RuntimeException("Unexpected exception", e)
- }
+ YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
+ credentials.addToken(new Text("hive.server2.delegation.token"), _)
}
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index f276e7efde..5924daf3ec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -22,14 +22,17 @@ import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.HashMap
+import scala.reflect.runtime._
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.Token
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -142,6 +145,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}
+
+ /**
+ * Obtains token for the Hive metastore, using the current user as the principal.
+ * Some exceptions are caught and downgraded to a log message.
+ * @param conf hadoop configuration; the Hive configuration will be based on this
+ * @return a token, or `None` if there's no need for a token (no metastore URI or principal
+ * in the config), or if a binding exception was caught and downgraded.
+ */
+ def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
+ try {
+ obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
+ } catch {
+ case e: ClassNotFoundException =>
+ logInfo(s"Hive class not found $e")
+ logDebug("Hive class not found", e)
+ None
+ }
+ }
+
+ /**
+ * Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
+ * @param conf hadoop configuration; the Hive configuration will be based on this.
+ * @param username the username of the principal requesting the delegating token.
+ * @return a delegation token
+ */
+ private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
+ username: String): Option[Token[DelegationTokenIdentifier]] = {
+ val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+
+ // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
+ // to a Configuration and used without reflection
+ val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+ // using the (Configuration, Class) constructor allows the current configuratin to be included
+ // in the hive config.
+ val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
+ classOf[Object].getClass)
+ val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
+ val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
+
+ // Check for local metastore
+ if (metastoreUri.nonEmpty) {
+ require(username.nonEmpty, "Username undefined")
+ val principalKey = "hive.metastore.kerberos.principal"
+ val principal = hiveConf.getTrimmed(principalKey, "")
+ require(principal.nonEmpty, "Hive principal $principalKey undefined")
+ logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
+ val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+ val closeCurrent = hiveClass.getMethod("closeCurrent")
+ try {
+ // get all the instance methods before invoking any
+ val getDelegationToken = hiveClass.getMethod("getDelegationToken",
+ classOf[String], classOf[String])
+ val getHive = hiveClass.getMethod("get", hiveConfClass)
+
+ // invoke
+ val hive = getHive.invoke(null, hiveConf)
+ val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
+ val hive2Token = new Token[DelegationTokenIdentifier]()
+ hive2Token.decodeFromUrlString(tokenStr)
+ Some(hive2Token)
+ } finally {
+ Utils.tryLogNonFatalError {
+ closeCurrent.invoke(null)
+ }
+ }
+ } else {
+ logDebug("HiveMetaStore configured in localmode")
+ None
+ }
+ }
}
object YarnSparkHadoopUtil {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index e1c67db765..9132c56a91 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -18,10 +18,12 @@
package org.apache.spark.deploy.yarn
import java.io.{File, IOException}
+import java.lang.reflect.InvocationTargetException
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -245,4 +247,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
System.clearProperty("SPARK_YARN_MODE")
}
}
+
+ test("Obtain tokens For HiveMetastore") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+ // thrift picks up on port 0 and bails out, without trying to talk to endpoint
+ hadoopConf.set("hive.metastore.uris", "http://localhost:0")
+ val util = new YarnSparkHadoopUtil
+ assertNestedHiveException(intercept[InvocationTargetException] {
+ util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
+ })
+ // expect exception trapping code to unwind this hive-side exception
+ assertNestedHiveException(intercept[InvocationTargetException] {
+ util.obtainTokenForHiveMetastore(hadoopConf)
+ })
+ }
+
+ def assertNestedHiveException(e: InvocationTargetException): Throwable = {
+ val inner = e.getCause
+ if (inner == null) {
+ fail("No inner cause", e)
+ }
+ if (!inner.isInstanceOf[HiveException]) {
+ fail("Not a hive exception", inner)
+ }
+ inner
+ }
+
}