aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala38
1 files changed, 37 insertions, 1 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 741239c953..4abcf7307a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -39,7 +39,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.apache.hadoop.security.token.Token
+import org.apache.hadoop.security.token.{TokenIdentifier, Token}
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -226,6 +226,7 @@ private[spark] class Client(
val distributedUris = new HashSet[String]
obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)
+ obtainTokenForHBase(hadoopConf, credentials)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
fs.getDefaultReplication(dst)).toShort
@@ -1085,6 +1086,41 @@ object Client extends Logging {
}
/**
+ * Obtain security token for HBase.
+ */
+ def obtainTokenForHBase(conf: Configuration, credentials: Credentials): Unit = {
+ if (UserGroupInformation.isSecurityEnabled) {
+ val mirror = universe.runtimeMirror(getClass.getClassLoader)
+
+ try {
+ val confCreate = mirror.classLoader.
+ loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
+ getMethod("create", classOf[Configuration])
+ val obtainToken = mirror.classLoader.
+ loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
+ getMethod("obtainToken", classOf[Configuration])
+
+ logDebug("Attempting to fetch HBase security token.")
+
+ val hbaseConf = confCreate.invoke(null, conf)
+ val token = obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]]
+ credentials.addToken(token.getService, token)
+
+ logInfo("Added HBase security token to credentials.")
+ } catch {
+ case e:java.lang.NoSuchMethodException =>
+ logInfo("HBase Method not found: " + e)
+ case e:java.lang.ClassNotFoundException =>
+ logDebug("HBase Class not found: " + e)
+ case e:java.lang.NoClassDefFoundError =>
+ logDebug("HBase Class not found: " + e)
+ case e:Exception =>
+ logError("Exception when obtaining HBase security token: " + e)
+ }
+ }
+ }
+
+ /**
* Return whether the two file systems are the same.
*/
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {