aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-14 22:20:14 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-14 22:20:14 -0800
commit1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0 (patch)
treeb5ece0cae499b1041d179ed34b038004fbd21e77 /streaming/src/test
parentf8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 (diff)
downloadspark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.gz
spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.bz2
spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.zip
Changed SparkConf to not be serializable. And also fixed unit-test log paths in log4j.properties of external modules.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala28
1 files changed, 20 insertions, 8 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 89daf47586..831e7c1471 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase {
val value = "myvalue"
System.setProperty(key, value)
ssc = new StreamingContext(master, framework, batchDuration)
+ val originalConf = ssc.conf
+
val cp = new Checkpoint(ssc, Time(1000))
- assert(!cp.sparkConf.contains("spark.driver.host"))
- assert(!cp.sparkConf.contains("spark.driver.port"))
- assert(!cp.sparkConf.contains("spark.hostPort"))
- assert(cp.sparkConf.get(key) === value)
+ val cpConf = cp.sparkConf
+ assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
+ assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
+ assert(cpConf.get(key) === value)
ssc.stop()
+
+ // Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(!newCp.sparkConf.contains("spark.driver.host"))
- assert(!newCp.sparkConf.contains("spark.driver.port"))
- assert(!newCp.sparkConf.contains("spark.hostPort"))
- assert(newCp.sparkConf.get(key) === value)
+
+ val newCpConf = newCp.sparkConf
+ assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
+ assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
+ assert(newCpConf.get(key) === value)
+ assert(!newCpConf.contains("spark.driver.host"))
+ assert(!newCpConf.contains("spark.driver.port"))
+
+ // Check if all the parameters have been restored
+ ssc = new StreamingContext(null, newCp, null)
+ val restoredConf = ssc.conf
+ assert(restoredConf.get(key) === value)
}