diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2015-06-30 11:46:22 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-06-30 11:46:22 -0700 |
commit | d16a9443750eebb7a3d7688d4b98a2ac39cc0da7 (patch) | |
tree | f4d54a749ab861d69161a5088c729be3367b2a0b /streaming | |
parent | 57264400ac7d9f9c59c387c252a9ed8d93fed4fa (diff) | |
download | spark-d16a9443750eebb7a3d7688d4b98a2ac39cc0da7.tar.gz spark-d16a9443750eebb7a3d7688d4b98a2ac39cc0da7.tar.bz2 spark-d16a9443750eebb7a3d7688d4b98a2ac39cc0da7.zip |
[SPARK-8619] [STREAMING] Don't recover keytab and principal configuration within Streaming checkpoint
[Client.scala](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L786) will change these configurations, so this would cause the problem that the Streaming recover logic can't find the local keytab file(since configuration was changed)
```scala
sparkConf.set("spark.yarn.keytab", keytabFileName)
sparkConf.set("spark.yarn.principal", args.principal)
```
Problem described at [Jira](https://issues.apache.org/jira/browse/SPARK-8619)
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes #7008 from SaintBacchus/SPARK-8619 and squashes the following commits:
d50dbdf [huangzhaowei] Delect one blank space
9b8e92c [huangzhaowei] Fix code style and add a short comment.
0d8f800 [huangzhaowei] Don't recover keytab and principal configuration within Streaming checkpoint.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index d8dc4e4101..5279331c9e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -44,11 +44,23 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val sparkConfPairs = ssc.conf.getAll def createSparkConf(): SparkConf = { + + // Reload properties for the checkpoint application since user wants to set a reload property + // or spark had changed its value and user wants to set it back. + val propertiesToReload = List( + "spark.master", + "spark.yarn.keytab", + "spark.yarn.principal") + val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.port") - val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master") - newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) } + val newReloadConf = new SparkConf(loadDefaults = true) + propertiesToReload.foreach { prop => + newReloadConf.getOption(prop).foreach { value => + newSparkConf.set(prop, value) + } + } newSparkConf } |