aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala16
1 files changed, 14 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index d8dc4e4101..5279331c9e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -44,11 +44,23 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val sparkConfPairs = ssc.conf.getAll
def createSparkConf(): SparkConf = {
+
+ // Reload properties for the checkpoint application since user wants to set a reload property
+ // or spark had changed its value and user wants to set it back.
+ val propertiesToReload = List(
+ "spark.master",
+ "spark.yarn.keytab",
+ "spark.yarn.principal")
+
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
.remove("spark.driver.host")
.remove("spark.driver.port")
- val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master")
- newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) }
+ val newReloadConf = new SparkConf(loadDefaults = true)
+ propertiesToReload.foreach { prop =>
+ newReloadConf.getOption(prop).foreach { value =>
+ newSparkConf.set(prop, value)
+ }
+ }
newSparkConf
}