aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-07-16 16:55:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-16 16:55:46 -0700
commit031d7d41430ec1f3c3353e33eab4821a9bcd58a5 (patch)
tree78c3abc0df6787b7910251f39f3123d2b8a35924 /streaming
parentfec10f0c63171bd5aff7a762a6c94df035f5fb52 (diff)
downloadspark-031d7d41430ec1f3c3353e33eab4821a9bcd58a5.tar.gz
spark-031d7d41430ec1f3c3353e33eab4821a9bcd58a5.tar.bz2
spark-031d7d41430ec1f3c3353e33eab4821a9bcd58a5.zip
[SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.
Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits: 89b01f5 [jerryshao] Update the unit test to add more cases 275d252 [jerryshao] Address the comments 7cc146d [jerryshao] Address the comments 2624723 [jerryshao] Fix rebase conflict 45befaa [Saisai Shao] Update the unit test bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala45
2 files changed, 46 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5279331c9e..65d4e933bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
// Reload properties for the checkpoint application since user wants to set a reload property
// or spark had changed its value and user wants to set it back.
val propertiesToReload = List(
+ "spark.driver.host",
+ "spark.driver.port",
"spark.master",
"spark.yarn.keytab",
"spark.yarn.principal")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6a94928076..d308ac05a5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase {
}
}
+ // This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered
+ // with correct value.
+ test("get correct spark.driver.[host|port] from checkpoint") {
+ val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999")
+ conf.foreach(kv => System.setProperty(kv._1, kv._2))
+ ssc = new StreamingContext(master, framework, batchDuration)
+ val originalConf = ssc.conf
+ assert(originalConf.get("spark.driver.host") === "localhost")
+ assert(originalConf.get("spark.driver.port") === "9999")
+
+ val cp = new Checkpoint(ssc, Time(1000))
+ ssc.stop()
+
+ // Serialize/deserialize to simulate write to storage and reading it back
+ val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
+
+ val newCpConf = newCp.createSparkConf()
+ assert(newCpConf.contains("spark.driver.host"))
+ assert(newCpConf.contains("spark.driver.port"))
+ assert(newCpConf.get("spark.driver.host") === "localhost")
+ assert(newCpConf.get("spark.driver.port") === "9999")
+
+ // Check if all the parameters have been restored
+ ssc = new StreamingContext(null, newCp, null)
+ val restoredConf = ssc.conf
+ assert(restoredConf.get("spark.driver.host") === "localhost")
+ assert(restoredConf.get("spark.driver.port") === "9999")
+ ssc.stop()
+
+ // If spark.driver.host and spark.driver.host is not set in system property, these two
+ // parameters should not be presented in the newly recovered conf.
+ conf.foreach(kv => System.clearProperty(kv._1))
+ val newCpConf1 = newCp.createSparkConf()
+ assert(!newCpConf1.contains("spark.driver.host"))
+ assert(!newCpConf1.contains("spark.driver.port"))
+
+ // Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set
+ // explicitly.
+ ssc = new StreamingContext(null, newCp, null)
+ val restoredConf1 = ssc.conf
+ assert(restoredConf1.get("spark.driver.host") === "localhost")
+ assert(restoredConf1.get("spark.driver.port") !== "9999")
+ }
- // This tests whether the systm can recover from a master failure with simple
+ // This tests whether the system can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
test("recovery with map and reduceByKey operations") {