diff options
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 72 |
1 files changed, 49 insertions, 23 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 7037aae234..b73edf81d4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{WindowedDStream, DStream} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag class BasicOperationsSuite extends TestSuiteBase { test("map") { @@ -395,40 +397,31 @@ class BasicOperationsSuite extends TestSuiteBase { Thread.sleep(1000) } - test("forgetting of RDDs - map and window operations") { - assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") + val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq - val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq + test("rdd cleanup - map and window") { val rememberDuration = Seconds(3) - - assert(input.size === 10, "Number of inputs have changed") - def operation(s: DStream[Int]): DStream[(Int, Int)] = { s.map(x => (x % 10, 1)) .window(Seconds(2), Seconds(1)) .window(Seconds(4), Seconds(2)) } - val ssc = setupStreams(input, operation _) - ssc.remember(rememberDuration) - runStreams[(Int, Int)](ssc, input.size, input.size / 2) - - val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head - val windowedStream1 = windowedStream2.dependencies.head + val operatedStream = runCleanupTest(conf, operation _, + numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) + val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] + val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] val mappedStream = windowedStream1.dependencies.head - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) - - // IDEALLY - // WindowedStream2 should remember till 7 seconds: 10, 8, - // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, + // Checkpoint remember durations + assert(windowedStream2.rememberDuration === rememberDuration) + assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) + assert(mappedStream.rememberDuration === + rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - // IN THIS TEST - // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 + // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 // WindowedStream2 assert(windowedStream2.generatedRDDs.contains(Time(10000))) @@ -445,4 +438,37 @@ class BasicOperationsSuite extends TestSuiteBase { assert(mappedStream.generatedRDDs.contains(Time(2000))) assert(!mappedStream.generatedRDDs.contains(Time(1000))) } + + test("rdd cleanup - updateStateByKey") { + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + } + val stateStream = runCleanupTest( + conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) + + assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2) + assert(stateStream.generatedRDDs.contains(Time(10000))) + assert(!stateStream.generatedRDDs.contains(Time(4000))) + } + + /** Test cleanup of RDDs in DStream metadata */ + def runCleanupTest[T: ClassTag]( + conf2: SparkConf, + operation: DStream[Int] => DStream[T], + numExpectedOutput: Int = cleanupTestInput.size, + rememberDuration: Duration = null + ): DStream[T] = { + + // Setup the stream computation + assert(batchDuration === Seconds(1), + "Batch duration has changed from 1 second, check cleanup tests") + val ssc = setupStreams(cleanupTestInput, operation) + val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] + if (rememberDuration != null) ssc.remember(rememberDuration) + val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + assert(clock.time === Seconds(10).milliseconds) + assert(output.size === numExpectedOutput) + operatedStream + } } |