aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2017-03-05 18:17:30 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-05 18:17:30 -0800
commit207067ead6db6dc87b0d144a658e2564e3280a89 (patch)
treec2408a75569c640627fc9d2fbe434c3b0c3b503d /streaming
parent224e0e785b4b449ea638c2629263c798116a3011 (diff)
downloadspark-207067ead6db6dc87b0d144a658e2564e3280a89.tar.gz
spark-207067ead6db6dc87b0d144a658e2564e3280a89.tar.bz2
spark-207067ead6db6dc87b0d144a658e2564e3280a89.zip
[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not filter checkpointFilesOfLatestTime with the PATH string.
## What changes were proposed in this pull request? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/ ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds. Last failure message: 8 did not equal 2. at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite .scala:172) at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211) ``` the check condition is: ``` val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { _.toString.contains(clock.getTimeMillis.toString) } // Checkpoint files are written twice for every batch interval. So assert that both // are written to make sure that both of them have been written. assert(checkpointFilesOfLatestTime.size === 2) ``` the path string may contain the `clock.getTimeMillis.toString`, like `3500` : ``` file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500 ▲▲▲▲ ``` so we should only check the filename, but not the whole path. ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17167 from uncleGen/flaky-CheckpointSuite.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
1 files changed, 1 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 7fcf45e7de..ee2fd45a7e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
stopSparkContext: Boolean
): Seq[Seq[V]] = {
try {
- val batchDuration = ssc.graph.batchDuration
val batchCounter = new BatchCounter(ssc)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val currentTime = clock.getTimeMillis()
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
clock.setTime(targetBatchTime.milliseconds)
@@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
eventually(timeout(10 seconds)) {
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
- _.toString.contains(clock.getTimeMillis.toString)
+ _.getName.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.