aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-13 14:57:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-13 14:57:07 -0800
commit27311b13321ba60ee1324b86234f0aaf63df9f67 (patch)
treec2b813a32ac9e019ae67399ad58d120c8f03d813 /streaming/src/test
parentb93f9d42f21f03163734ef97b2871db945e166da (diff)
downloadspark-27311b13321ba60ee1324b86234f0aaf63df9f67.tar.gz
spark-27311b13321ba60ee1324b86234f0aaf63df9f67.tar.bz2
spark-27311b13321ba60ee1324b86234f0aaf63df9f67.zip
Added unpersisting and modified testsuite to better test out metadata cleaning.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala72
1 files changed, 49 insertions, 23 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 7037aae234..b73edf81d4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._
import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
class BasicOperationsSuite extends TestSuiteBase {
test("map") {
@@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase {
Thread.sleep(1000)
}
- test("forgetting of RDDs - map and window operations") {
- assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
+ val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
- val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq
+ test("rdd cleanup - map and window") {
val rememberDuration = Seconds(3)
-
- assert(input.size === 10, "Number of inputs have changed")
-
def operation(s: DStream[Int]): DStream[(Int, Int)] = {
s.map(x => (x % 10, 1))
.window(Seconds(2), Seconds(1))
.window(Seconds(4), Seconds(2))
}
- val ssc = setupStreams(input, operation _)
- ssc.remember(rememberDuration)
- runStreams[(Int, Int)](ssc, input.size, input.size / 2)
-
- val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head
- val windowedStream1 = windowedStream2.dependencies.head
+ val operatedStream = runCleanupTest(conf, operation _,
+ numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3))
+ val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
+ val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val mappedStream = windowedStream1.dependencies.head
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- assert(clock.time === Seconds(10).milliseconds)
-
- // IDEALLY
- // WindowedStream2 should remember till 7 seconds: 10, 8,
- // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5
- // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3,
+ // Checkpoint remember durations
+ assert(windowedStream2.rememberDuration === rememberDuration)
+ assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration)
+ assert(mappedStream.rememberDuration ===
+ rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration)
- // IN THIS TEST
- // WindowedStream2 should remember till 7 seconds: 10, 8,
+ // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7
// WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4
- // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
+ // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2
// WindowedStream2
assert(windowedStream2.generatedRDDs.contains(Time(10000)))
@@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(mappedStream.generatedRDDs.contains(Time(2000)))
assert(!mappedStream.generatedRDDs.contains(Time(1000)))
}
+
+ test("rdd cleanup - updateStateByKey") {
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some(values.foldLeft(0)(_ + _) + state.getOrElse(0))
+ }
+ val stateStream = runCleanupTest(
+ conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3)))
+
+ assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2)
+ assert(stateStream.generatedRDDs.contains(Time(10000)))
+ assert(!stateStream.generatedRDDs.contains(Time(4000)))
+ }
+
+ /** Test cleanup of RDDs in DStream metadata */
+ def runCleanupTest[T: ClassTag](
+ conf2: SparkConf,
+ operation: DStream[Int] => DStream[T],
+ numExpectedOutput: Int = cleanupTestInput.size,
+ rememberDuration: Duration = null
+ ): DStream[T] = {
+
+ // Setup the stream computation
+ assert(batchDuration === Seconds(1),
+ "Batch duration has changed from 1 second, check cleanup tests")
+ val ssc = setupStreams(cleanupTestInput, operation)
+ val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]]
+ if (rememberDuration != null) ssc.remember(rememberDuration)
+ val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ assert(clock.time === Seconds(10).milliseconds)
+ assert(output.size === numExpectedOutput)
+ operatedStream
+ }
}