diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2016-02-26 07:32:07 -0600 |
---|---|---|
committer | Tom Graves <tgraves@yahoo-inc.com> | 2016-02-26 07:32:07 -0600 |
commit | 5c3912e5c90ce659146c3056430d100604378b71 (patch) | |
tree | 2bc5ceef2398e9c3b9cab271b17a0c3bf60e522c | |
parent | 318bf41158a670e9d62123ea0cb27a833affae24 (diff) | |
download | spark-5c3912e5c90ce659146c3056430d100604378b71.tar.gz spark-5c3912e5c90ce659146c3056430d100604378b71.tar.bz2 spark-5c3912e5c90ce659146c3056430d100604378b71.zip |
[SPARK-12523][YARN] Support long-running of the Spark On HBase and hive meta store.
Obtain the hive metastore and hbase token as well as hdfs token in DelegationToeknRenewer to supoort long-running application of spark on hbase or thriftserver.
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes #10645 from SaintBacchus/SPARK-12523.
3 files changed, 42 insertions, 40 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index 2ac9e33873..70b67d21ec 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -172,6 +172,8 @@ private[yarn] class AMDelegationTokenRenewer( override def run(): Void = { val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds) + hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, tempCreds) + hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds) null } }) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 530f1d753b..dac3ea2517 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -345,8 +345,8 @@ private[spark] class Client( // multiple times, YARN will fail to launch containers for the app with an internal // error. val distributedUris = new HashSet[String] - obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) - obtainTokenForHBase(sparkConf, hadoopConf, credentials) + YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) + YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -1358,35 +1358,6 @@ object Client extends Logging { } /** - * Obtains token for the Hive metastore and adds them to the credentials. - */ - private def obtainTokenForHiveMetastore( - sparkConf: SparkConf, - conf: Configuration, - credentials: Credentials) { - if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) { - YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach { - credentials.addToken(new Text("hive.server2.delegation.token"), _) - } - } - } - - /** - * Obtain a security token for HBase. - */ - def obtainTokenForHBase( - sparkConf: SparkConf, - conf: Configuration, - credentials: Credentials): Unit = { - if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) { - YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token => - credentials.addToken(token.getService, token) - logInfo("Added HBase security token to credentials.") - } - } - } - - /** * Return whether the two file systems are the same. */ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { @@ -1450,13 +1421,4 @@ object Client extends Logging { components.mkString(Path.SEPARATOR) } - /** - * Return whether delegation tokens should be retrieved for the given service when security is - * enabled. By default, tokens are retrieved, but that behavior can be changed by setting - * a service-specific configuration. - */ - def shouldGetTokens(conf: SparkConf, service: String): Boolean = { - conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true) - } - } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 272f1299e0..4c9432dbd6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -133,6 +133,44 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } } + /** + * Obtains token for the Hive metastore and adds them to the credentials. + */ + def obtainTokenForHiveMetastore( + sparkConf: SparkConf, + conf: Configuration, + credentials: Credentials) { + if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) { + YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach { + credentials.addToken(new Text("hive.server2.delegation.token"), _) + } + } + } + + /** + * Obtain a security token for HBase. + */ + def obtainTokenForHBase( + sparkConf: SparkConf, + conf: Configuration, + credentials: Credentials): Unit = { + if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) { + YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token => + credentials.addToken(token.getService, token) + logInfo("Added HBase security token to credentials.") + } + } + } + + /** + * Return whether delegation tokens should be retrieved for the given service when security is + * enabled. By default, tokens are retrieved, but that behavior can be changed by setting + * a service-specific configuration. + */ + private def shouldGetTokens(conf: SparkConf, service: String): Boolean = { + conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true) + } + private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = { tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf)) tokenRenewer.get.updateCredentialsIfRequired() |