aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
1 files changed, 1 insertions, 3 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 7fcf45e7de..ee2fd45a7e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
stopSparkContext: Boolean
): Seq[Seq[V]] = {
try {
- val batchDuration = ssc.graph.batchDuration
val batchCounter = new BatchCounter(ssc)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val currentTime = clock.getTimeMillis()
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
clock.setTime(targetBatchTime.milliseconds)
@@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
eventually(timeout(10 seconds)) {
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
- _.toString.contains(clock.getTimeMillis.toString)
+ _.getName.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.