aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2015-06-30 11:46:22 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-30 11:46:22 -0700
commitd16a9443750eebb7a3d7688d4b98a2ac39cc0da7 (patch)
treef4d54a749ab861d69161a5088c729be3367b2a0b /streaming
parent57264400ac7d9f9c59c387c252a9ed8d93fed4fa (diff)
downloadspark-d16a9443750eebb7a3d7688d4b98a2ac39cc0da7.tar.gz
spark-d16a9443750eebb7a3d7688d4b98a2ac39cc0da7.tar.bz2
spark-d16a9443750eebb7a3d7688d4b98a2ac39cc0da7.zip
[SPARK-8619] [STREAMING] Don't recover keytab and principal configuration within Streaming checkpoint
[Client.scala](https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L786) will change these configurations, so this would cause the problem that the Streaming recover logic can't find the local keytab file(since configuration was changed) ```scala sparkConf.set("spark.yarn.keytab", keytabFileName) sparkConf.set("spark.yarn.principal", args.principal) ``` Problem described at [Jira](https://issues.apache.org/jira/browse/SPARK-8619) Author: huangzhaowei <carlmartinmax@gmail.com> Closes #7008 from SaintBacchus/SPARK-8619 and squashes the following commits: d50dbdf [huangzhaowei] Delect one blank space 9b8e92c [huangzhaowei] Fix code style and add a short comment. 0d8f800 [huangzhaowei] Don't recover keytab and principal configuration within Streaming checkpoint.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala16
1 files changed, 14 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index d8dc4e4101..5279331c9e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -44,11 +44,23 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val sparkConfPairs = ssc.conf.getAll
def createSparkConf(): SparkConf = {
+
+ // Reload properties for the checkpoint application since user wants to set a reload property
+ // or spark had changed its value and user wants to set it back.
+ val propertiesToReload = List(
+ "spark.master",
+ "spark.yarn.keytab",
+ "spark.yarn.principal")
+
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
.remove("spark.driver.host")
.remove("spark.driver.port")
- val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master")
- newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) }
+ val newReloadConf = new SparkConf(loadDefaults = true)
+ propertiesToReload.foreach { prop =>
+ newReloadConf.getOption(prop).foreach { value =>
+ newSparkConf.set(prop, value)
+ }
+ }
newSparkConf
}