diff options
author | uncleGen <hustyugm@gmail.com> | 2017-03-05 18:17:30 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-03-05 18:17:30 -0800 |
commit | 207067ead6db6dc87b0d144a658e2564e3280a89 (patch) | |
tree | c2408a75569c640627fc9d2fbe434c3b0c3b503d /streaming/src | |
parent | 224e0e785b4b449ea638c2629263c798116a3011 (diff) | |
download | spark-207067ead6db6dc87b0d144a658e2564e3280a89.tar.gz spark-207067ead6db6dc87b0d144a658e2564e3280a89.tar.bz2 spark-207067ead6db6dc87b0d144a658e2564e3280a89.zip |
[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not filter checkpointFilesOfLatestTime with the PATH string.
## What changes were proposed in this pull request?
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/
```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code
passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds.
Last failure message: 8 did not equal 2.
at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite
.scala:172)
at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211)
```
the check condition is:
```
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
_.toString.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
```
the path string may contain the `clock.getTimeMillis.toString`, like `3500` :
```
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500
▲▲▲▲
```
so we should only check the filename, but not the whole path.
## How was this patch tested?
Jenkins.
Author: uncleGen <hustyugm@gmail.com>
Closes #17167 from uncleGen/flaky-CheckpointSuite.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 |
1 files changed, 1 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 7fcf45e7de..ee2fd45a7e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite => stopSparkContext: Boolean ): Seq[Seq[V]] = { try { - val batchDuration = ssc.graph.batchDuration val batchCounter = new BatchCounter(ssc) ssc.start() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val currentTime = clock.getTimeMillis() logInfo("Manual clock before advancing = " + clock.getTimeMillis()) clock.setTime(targetBatchTime.milliseconds) @@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => eventually(timeout(10 seconds)) { val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { - _.toString.contains(clock.getTimeMillis.toString) + _.getName.contains(clock.getTimeMillis.toString) } // Checkpoint files are written twice for every batch interval. So assert that both // are written to make sure that both of them have been written. |