diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-28 21:26:43 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-28 21:26:43 -0500 |
commit | a8f316386a429c6d73e3e3824ea6eb28b0381cb5 (patch) | |
tree | 57524ec56196cb272731f57e09c66fc798ed8514 /streaming | |
parent | 578bd1fc28513eb84002c604000250f5cff9b815 (diff) | |
download | spark-a8f316386a429c6d73e3e3824ea6eb28b0381cb5.tar.gz spark-a8f316386a429c6d73e3e3824ea6eb28b0381cb5.tar.bz2 spark-a8f316386a429c6d73e3e3824ea6eb28b0381cb5.zip |
Fix CheckpointSuite test failures
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 315bd5443c..2a41ec0035 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -42,8 +42,6 @@ import org.apache.spark.streaming.util.ManualClock */ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") - before { FileUtils.deleteDirectory(new File(checkpointDir)) } @@ -135,13 +133,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Restart stream computation from the new checkpoint file to see whether that file has // correct checkpoint data - conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") - // Adjust manual clock time as if it is being restarted after a delay + // Adjust manual clock time as if it is being restarted after a delay; this is a hack because + // we modify the conf object, but it works for this one property + ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc.start() advanceTimeWithRealDelay(ssc, 4) ssc.stop() |