aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-09 13:22:17 -0700
committerAndrew Or <andrew@databricks.com>2015-07-09 13:22:17 -0700
commit88bf430331eef3c02438ca441616034486e15789 (patch)
treecb210aba82e3460d48af4fec14d8ceb5ac914256 /streaming
parent930fe95350f8865e2af2d7afa5b717210933cd43 (diff)
downloadspark-88bf430331eef3c02438ca441616034486e15789.tar.gz
spark-88bf430331eef3c02438ca441616034486e15789.tar.bz2
spark-88bf430331eef3c02438ca441616034486e15789.zip
[SPARK-7419] [STREAMING] [TESTS] Fix CheckpointSuite.recovery with file input stream
Fix this failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/2886/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.3,label=centos/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_with_file_input_stream/ To reproduce this failure, you can add `Thread.sleep(2000)` before this line https://github.com/apache/spark/blob/a9c4e29950a14e32acaac547e9a0e8879fd37fc9/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala#L477 Author: zsxwing <zsxwing@gmail.com> Closes #7323 from zsxwing/SPARK-7419 and squashes the following commits: b3caf58 [zsxwing] Fix CheckpointSuite.recovery with file input stream
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6b0a3f91d4..6a94928076 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -424,11 +424,11 @@ class CheckpointSuite extends TestSuiteBase {
}
}
}
- clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
// Wait until all files have been recorded and all batches have started
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
}
+ clock.advance(batchDuration.milliseconds)
// Wait for a checkpoint to be written
eventually(eventuallyTimeout) {
assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6)
@@ -454,9 +454,12 @@ class CheckpointSuite extends TestSuiteBase {
// recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
- // So that the restarted StreamingContext's clock has gone forward in time since failure
- ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
- val oldClockTime = clock.getTimeMillis()
+ // "batchDuration.milliseconds * 3" has gone before restarting StreamingContext. And because
+ // the recovery time is read from the checkpoint time but the original clock doesn't align
+ // with the batch time, we need to add the offset "batchDuration.milliseconds / 2".
+ ssc.conf.set("spark.streaming.manualClock.jump",
+ (batchDuration.milliseconds / 2 + batchDuration.milliseconds * 3).toString)
+ val oldClockTime = clock.getTimeMillis() // 15000ms
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
@@ -467,10 +470,10 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
// Verify that the clock has traveled forward to the expected time
eventually(eventuallyTimeout) {
- clock.getTimeMillis() === oldClockTime
+ assert(clock.getTimeMillis() === oldClockTime)
}
- // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
- val numBatchesAfterRestart = 4
+ // There are 5 batches between 6000ms and 15000ms (inclusive).
+ val numBatchesAfterRestart = 5
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
}
@@ -483,7 +486,6 @@ class CheckpointSuite extends TestSuiteBase {
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
}
}
- clock.advance(batchDuration.milliseconds)
logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()