aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-22 00:43:31 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-22 00:43:31 -0800
commit364cdb679cf2b0d5e6ed7ab89628f15594d7947f (patch)
tree4d9687703a2c6249c2bcee762f575779e9d1155b /streaming/src/test
parent86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff)
downloadspark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.gz
spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.bz2
spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.zip
Refactored DStreamCheckpointData.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala12
1 files changed, 6 insertions, 6 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index d2f32c189b..58da4ee539 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -63,9 +63,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// then check whether some RDD has been checkpointed or not
ssc.start()
runStreamsWithRealDelay(ssc, firstNumBatches)
- logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
- stateStream.checkpointData.rdds.foreach {
+ logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@@ -74,7 +74,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
- val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
+ val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
runStreamsWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -91,8 +91,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// is present in the checkpoint data or not
ssc.start()
runStreamsWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
- stateStream.checkpointData.rdds.foreach {
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),