diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-14 22:20:14 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-14 22:20:14 -0800 |
commit | 1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0 (patch) | |
tree | b5ece0cae499b1041d179ed34b038004fbd21e77 /streaming/src/test | |
parent | f8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 (diff) | |
download | spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.gz spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.bz2 spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.zip |
Changed SparkConf to not be serializable. And also fixed unit-test log paths in log4j.properties of external modules.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 89daf47586..831e7c1471 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase { val value = "myvalue" System.setProperty(key, value) ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + val cp = new Checkpoint(ssc, Time(1000)) - assert(!cp.sparkConf.contains("spark.driver.host")) - assert(!cp.sparkConf.contains("spark.driver.port")) - assert(!cp.sparkConf.contains("spark.hostPort")) - assert(cp.sparkConf.get(key) === value) + val cpConf = cp.sparkConf + assert(cpConf.get("spark.master") === originalConf.get("spark.master")) + assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(cpConf.get(key) === value) ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(!newCp.sparkConf.contains("spark.driver.host")) - assert(!newCp.sparkConf.contains("spark.driver.port")) - assert(!newCp.sparkConf.contains("spark.hostPort")) - assert(newCp.sparkConf.get(key) === value) + + val newCpConf = newCp.sparkConf + assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(newCpConf.get(key) === value) + assert(!newCpConf.contains("spark.driver.host")) + assert(!newCpConf.contains("spark.driver.port")) + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(restoredConf.get(key) === value) } |