aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-12 14:22:05 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-12 14:22:05 -0800
commit564dd8c3f415746a68f05bde6ea2a0e7a7760b4c (patch)
tree6c6fa15d157bd0693aa0e91608b8a15470b325a4 /streaming
parentb9bfd1456f09f4db281fb9d108a339c59a2e2dda (diff)
downloadspark-564dd8c3f415746a68f05bde6ea2a0e7a7760b4c.tar.gz
spark-564dd8c3f415746a68f05bde6ea2a0e7a7760b4c.tar.bz2
spark-564dd8c3f415746a68f05bde6ea2a0e7a7760b4c.zip
Speeded up CheckpointSuite
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala26
1 files changed, 14 insertions, 12 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 038827ddb0..0ad57e38b9 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -15,12 +15,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
after {
+
+ if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
}
+ var ssc: StreamingContext = null
+
override def framework = "CheckpointSuite"
- override def batchDuration = Milliseconds(500)
+ override def batchDuration = Milliseconds(200)
override def checkpointDir = "checkpoint"
@@ -30,12 +34,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("basic stream+rdd recovery") {
- assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
+ assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second")
assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
- val stateStreamCheckpointInterval = Seconds(2)
+ val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
@@ -110,6 +114,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
runStreamsWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
+ ssc = null
}
test("map and reduceByKey") {
@@ -131,9 +136,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
.checkpoint(Seconds(2))
}
- for (i <- Seq(2, 3, 4)) {
- testCheckpointedOperation(input, operation, output, i)
- }
+ testCheckpointedOperation(input, operation, output, 3)
}
test("updateStateByKey") {
@@ -148,9 +151,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.checkpoint(Seconds(2))
.map(t => (t._1, t._2.self))
}
- for (i <- Seq(2, 3, 4)) {
- testCheckpointedOperation(input, operation, output, i)
- }
+ testCheckpointedOperation(input, operation, output, 3)
}
@@ -171,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do half the computation (half the number of batches), create checkpoint file and quit
- val ssc = setupStreams[U, V](input, operation)
+ ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -182,9 +183,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
" Restarting stream computation " +
"\n-------------------------------------------\n"
)
- val sscNew = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs)
+ ssc = new StreamingContext(checkpointDir)
+ val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc = null
}
/**