aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala24
2 files changed, 37 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 84ae122f44..09d2ec90c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
-import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
+import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -521,8 +521,19 @@ object SparkSubmit {
sysProps.put("spark.yarn.isPython", "true")
}
if (args.principal != null) {
- require(args.keytab != null, "Keytab must be specified when the keytab is specified")
- UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ require(args.keytab != null, "Keytab must be specified when principal is specified")
+ if (!new File(args.keytab).exists()) {
+ throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
+ } else {
+ // Add keytab and principal configurations in sysProps to make them available
+ // for later use; e.g. in spark sql, the isolated class loader used to talk
+ // to HiveMetastore will use these settings. They will be set as Java system
+ // properties and then loaded by SparkConf
+ sysProps.put("spark.yarn.keytab", args.keytab)
+ sysProps.put("spark.yarn.principal", args.principal)
+
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index f1c2489b38..598ccdeee4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -32,9 +32,10 @@ import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.{Driver, metadata}
import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.VersionInfo
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, SparkException, Logging}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -149,6 +150,27 @@ private[hive] class ClientWrapper(
val original = Thread.currentThread().getContextClassLoader
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)
+
+ // Set up kerberos credentials for UserGroupInformation.loginUser within
+ // current class loader
+ // Instead of using the spark conf of the current spark context, a new
+ // instance of SparkConf is needed for the original value of spark.yarn.keytab
+ // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
+ // keytab configuration for the link name in distributed cache
+ val sparkConf = new SparkConf
+ if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
+ val principalName = sparkConf.get("spark.yarn.principal")
+ val keytabFileName = sparkConf.get("spark.yarn.keytab")
+ if (!new File(keytabFileName).exists()) {
+ throw new SparkException(s"Keytab file: ${keytabFileName}" +
+ " specified in spark.yarn.keytab does not exist")
+ } else {
+ logInfo("Attempting to login to Kerberos" +
+ s" using principal: ${principalName} and keytab: ${keytabFileName}")
+ UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
+ }
+ }
+
val ret = try {
val initialConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and