aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-05 18:03:12 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-05 18:03:12 -0800
commit468ad0ae874d5cf55712ee976faf77f19c937ccb (patch)
tree75766fe6a8b7b6d9baa5f172662c012e036f7100 /streaming
parent8fa8c8375d7015a0332aa9ee613d7c6b6d62bae7 (diff)
downloadspark-468ad0ae874d5cf55712ee976faf77f19c937ccb.tar.gz
spark-468ad0ae874d5cf55712ee976faf77f19c937ccb.tar.bz2
spark-468ad0ae874d5cf55712ee976faf77f19c937ccb.zip
[SPARK-11457][STREAMING][YARN] Fix incorrect AM proxy filter conf recovery from checkpoint
Currently Yarn AM proxy filter configuration is recovered from checkpoint file when Spark Streaming application is restarted, which will lead to some unwanted behaviors: 1. Wrong RM address if RM is redeployed from failure. 2. Wrong proxyBase, since app id is updated, old app id for proxyBase is wrong. So instead of recovering from checkpoint file, these configurations should be reloaded each time when app started. This problem only exists in Yarn cluster mode, for Yarn client mode, these configurations will be updated with RPC message `AddWebUIFilter`. Please help to review tdas harishreedharan vanzin , thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #9412 from jerryshao/SPARK-11457.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala13
1 files changed, 12 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b7de6dde61..0cd55d9aec 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -55,7 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
"spark.driver.port",
"spark.master",
"spark.yarn.keytab",
- "spark.yarn.principal")
+ "spark.yarn.principal",
+ "spark.ui.filters")
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
.remove("spark.driver.host")
@@ -66,6 +67,16 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
newSparkConf.set(prop, value)
}
}
+
+ // Add Yarn proxy filter specific configurations to the recovered SparkConf
+ val filter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ val filterPrefix = s"spark.$filter.param."
+ newReloadConf.getAll.foreach { case (k, v) =>
+ if (k.startsWith(filterPrefix) && k.length > filterPrefix.length) {
+ newSparkConf.set(k, v)
+ }
+ }
+
newSparkConf
}