diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 00:43:31 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 00:43:31 -0800 |
commit | 364cdb679cf2b0d5e6ed7ab89628f15594d7947f (patch) | |
tree | 4d9687703a2c6249c2bcee762f575779e9d1155b /streaming/src/test | |
parent | 86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff) | |
download | spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.gz spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.bz2 spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.zip |
Refactored DStreamCheckpointData.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/spark/streaming/CheckpointSuite.scala | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index d2f32c189b..58da4ee539 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -63,9 +63,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // then check whether some RDD has been checkpointed or not ssc.start() runStreamsWithRealDelay(ssc, firstNumBatches) - logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]") - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure") - stateStream.checkpointData.rdds.foreach { + logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") @@ -74,7 +74,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted - val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString)) + val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) runStreamsWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -91,8 +91,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // is present in the checkpoint data or not ssc.start() runStreamsWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure") - stateStream.checkpointData.rdds.foreach { + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), |