aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2016-02-26 07:32:07 -0600
committerTom Graves <tgraves@yahoo-inc.com>2016-02-26 07:32:07 -0600
commit5c3912e5c90ce659146c3056430d100604378b71 (patch)
tree2bc5ceef2398e9c3b9cab271b17a0c3bf60e522c /yarn
parent318bf41158a670e9d62123ea0cb27a833affae24 (diff)
downloadspark-5c3912e5c90ce659146c3056430d100604378b71.tar.gz
spark-5c3912e5c90ce659146c3056430d100604378b71.tar.bz2
spark-5c3912e5c90ce659146c3056430d100604378b71.zip
[SPARK-12523][YARN] Support long-running of the Spark On HBase and hive meta store.
Obtain the hive metastore and hbase token as well as hdfs token in DelegationToeknRenewer to supoort long-running application of spark on hbase or thriftserver. Author: huangzhaowei <carlmartinmax@gmail.com> Closes #10645 from SaintBacchus/SPARK-12523.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala42
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala38
3 files changed, 42 insertions, 40 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 2ac9e33873..70b67d21ec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -172,6 +172,8 @@ private[yarn] class AMDelegationTokenRenewer(
override def run(): Void = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
hadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)
+ hadoopUtil.obtainTokenForHiveMetastore(sparkConf, freshHadoopConf, tempCreds)
+ hadoopUtil.obtainTokenForHBase(sparkConf, freshHadoopConf, tempCreds)
null
}
})
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 530f1d753b..dac3ea2517 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -345,8 +345,8 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
- obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
- obtainTokenForHBase(sparkConf, hadoopConf, credentials)
+ YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
+ YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
fs.getDefaultReplication(dst)).toShort
@@ -1358,35 +1358,6 @@ object Client extends Logging {
}
/**
- * Obtains token for the Hive metastore and adds them to the credentials.
- */
- private def obtainTokenForHiveMetastore(
- sparkConf: SparkConf,
- conf: Configuration,
- credentials: Credentials) {
- if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
- YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
- credentials.addToken(new Text("hive.server2.delegation.token"), _)
- }
- }
- }
-
- /**
- * Obtain a security token for HBase.
- */
- def obtainTokenForHBase(
- sparkConf: SparkConf,
- conf: Configuration,
- credentials: Credentials): Unit = {
- if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
- YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
- credentials.addToken(token.getService, token)
- logInfo("Added HBase security token to credentials.")
- }
- }
- }
-
- /**
* Return whether the two file systems are the same.
*/
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
@@ -1450,13 +1421,4 @@ object Client extends Logging {
components.mkString(Path.SEPARATOR)
}
- /**
- * Return whether delegation tokens should be retrieved for the given service when security is
- * enabled. By default, tokens are retrieved, but that behavior can be changed by setting
- * a service-specific configuration.
- */
- def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
- conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
- }
-
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 272f1299e0..4c9432dbd6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -133,6 +133,44 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
}
+ /**
+ * Obtains token for the Hive metastore and adds them to the credentials.
+ */
+ def obtainTokenForHiveMetastore(
+ sparkConf: SparkConf,
+ conf: Configuration,
+ credentials: Credentials) {
+ if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
+ YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
+ credentials.addToken(new Text("hive.server2.delegation.token"), _)
+ }
+ }
+ }
+
+ /**
+ * Obtain a security token for HBase.
+ */
+ def obtainTokenForHBase(
+ sparkConf: SparkConf,
+ conf: Configuration,
+ credentials: Credentials): Unit = {
+ if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
+ YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
+ credentials.addToken(token.getService, token)
+ logInfo("Added HBase security token to credentials.")
+ }
+ }
+ }
+
+ /**
+ * Return whether delegation tokens should be retrieved for the given service when security is
+ * enabled. By default, tokens are retrieved, but that behavior can be changed by setting
+ * a service-specific configuration.
+ */
+ private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
+ conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
+ }
+
private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = {
tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
tokenRenewer.get.updateCredentialsIfRequired()