diff options
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 315bd5443c..2a41ec0035 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -42,8 +42,6 @@ import org.apache.spark.streaming.util.ManualClock */ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - before { FileUtils.deleteDirectory(new File(checkpointDir)) } @@ -135,13 +133,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Restart stream computation from the new checkpoint file to see whether that file has // correct checkpoint data - conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") - // Adjust manual clock time as if it is being restarted after a delay + // Adjust manual clock time as if it is being restarted after a delay; this is a hack because + // we modify the conf object, but it works for this one property + ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc.start() advanceTimeWithRealDelay(ssc, 4) ssc.stop() |