diff options
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 72 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala | 40 |
2 files changed, 89 insertions, 23 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index cb53555f5c..bcb0c28bf0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,7 +24,9 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{WindowedDStream, DStream} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag class BasicOperationsSuite extends TestSuiteBase { test("map") { @@ -394,40 +396,31 @@ class BasicOperationsSuite extends TestSuiteBase { Thread.sleep(1000) } - test("forgetting of RDDs - map and window operations") { - assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") + val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq - val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq + test("rdd cleanup - map and window") { val rememberDuration = Seconds(3) - - assert(input.size === 10, "Number of inputs have changed") - def operation(s: DStream[Int]): DStream[(Int, Int)] = { s.map(x => (x % 10, 1)) .window(Seconds(2), Seconds(1)) .window(Seconds(4), Seconds(2)) } - val ssc = setupStreams(input, operation _) - ssc.remember(rememberDuration) - runStreams[(Int, Int)](ssc, input.size, input.size / 2) - - val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head - val windowedStream1 = windowedStream2.dependencies.head + val operatedStream = runCleanupTest(conf, operation _, + numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) + val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] + val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] val mappedStream = windowedStream1.dependencies.head - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) - - // IDEALLY - // WindowedStream2 should remember till 7 seconds: 10, 8, - // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, + // Checkpoint remember durations + assert(windowedStream2.rememberDuration === rememberDuration) + assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) + assert(mappedStream.rememberDuration === + rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - // IN THIS TEST - // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 + // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 // WindowedStream2 assert(windowedStream2.generatedRDDs.contains(Time(10000))) @@ -444,4 +437,37 @@ class BasicOperationsSuite extends TestSuiteBase { assert(mappedStream.generatedRDDs.contains(Time(2000))) assert(!mappedStream.generatedRDDs.contains(Time(1000))) } + + test("rdd cleanup - updateStateByKey") { + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some(values.foldLeft(0)(_ + _) + state.getOrElse(0)) + } + val stateStream = runCleanupTest( + conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) + + assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2) + assert(stateStream.generatedRDDs.contains(Time(10000))) + assert(!stateStream.generatedRDDs.contains(Time(4000))) + } + + /** Test cleanup of RDDs in DStream metadata */ + def runCleanupTest[T: ClassTag]( + conf2: SparkConf, + operation: DStream[Int] => DStream[T], + numExpectedOutput: Int = cleanupTestInput.size, + rememberDuration: Duration = null + ): DStream[T] = { + + // Setup the stream computation + assert(batchDuration === Seconds(1), + "Batch duration has changed from 1 second, check cleanup tests") + val ssc = setupStreams(cleanupTestInput, operation) + val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] + if (rememberDuration != null) ssc.remember(rememberDuration) + val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + assert(clock.time === Seconds(10).milliseconds) + assert(output.size === numExpectedOutput) + operatedStream + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala new file mode 100644 index 0000000000..15f13d5b19 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import org.scalatest.FunSuite +import java.io.ByteArrayOutputStream +import java.util.concurrent.TimeUnit._ + +class RateLimitedOutputStreamSuite extends FunSuite { + + private def benchmark[U](f: => U): Long = { + val start = System.nanoTime + f + System.nanoTime - start + } + + test("write") { + val underlying = new ByteArrayOutputStream + val data = "X" * 41000 + val stream = new RateLimitedOutputStream(underlying, 10000) + val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } + assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4) + assert(underlying.toString("UTF-8") == data) + } +} |