aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 21:26:43 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 21:26:43 -0500
commita8f316386a429c6d73e3e3824ea6eb28b0381cb5 (patch)
tree57524ec56196cb272731f57e09c66fc798ed8514 /streaming
parent578bd1fc28513eb84002c604000250f5cff9b815 (diff)
downloadspark-a8f316386a429c6d73e3e3824ea6eb28b0381cb5.tar.gz
spark-a8f316386a429c6d73e3e3824ea6eb28b0381cb5.tar.bz2
spark-a8f316386a429c6d73e3e3824ea6eb28b0381cb5.zip
Fix CheckpointSuite test failures
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala7
1 files changed, 3 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 315bd5443c..2a41ec0035 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -42,8 +42,6 @@ import org.apache.spark.streaming.util.ManualClock
*/
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
-
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@@ -135,13 +133,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Restart stream computation from the new checkpoint file to see whether that file has
// correct checkpoint data
- conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
- // Adjust manual clock time as if it is being restarted after a delay
+ // Adjust manual clock time as if it is being restarted after a delay; this is a hack because
+ // we modify the conf object, but it works for this one property
+ ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()