diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-21 11:59:21 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-21 11:59:21 -0800 |
commit | 078c71c2dcbb1470d22f8eb8138fb17e3d7c2414 (patch) | |
tree | 06a3b9e566014326fa32e6332eb52357bca3228e /streaming/src | |
parent | ccfe60a8304871779ff1b31b8c2d724f59d5b2af (diff) | |
download | spark-078c71c2dcbb1470d22f8eb8138fb17e3d7c2414.tar.gz spark-078c71c2dcbb1470d22f8eb8138fb17e3d7c2414.tar.bz2 spark-078c71c2dcbb1470d22f8eb8138fb17e3d7c2414.zip |
[SPARK-18954][TESTS] Fix flaky test: o.a.s.streaming.BasicOperationsSuite rdd cleanup - map and window
## What changes were proposed in this pull request?
The issue in this test is the cleanup of RDDs may not be able to finish before stopping StreamingContext. This PR basically just puts the assertions into `eventually` and runs it before stopping StreamingContext.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16362 from zsxwing/SPARK-18954.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 98 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 19 |
2 files changed, 73 insertions, 44 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 4e702bbb92..a3062ac946 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials import scala.reflect.ClassTag +import org.scalatest.concurrent.Eventually.eventually + import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, WindowedDStream} @@ -657,48 +657,57 @@ class BasicOperationsSuite extends TestSuiteBase { .window(Seconds(4), Seconds(2)) } - val operatedStream = runCleanupTest(conf, operation _, - numExpectedOutput = cleanupTestInput.size / 2, rememberDuration = Seconds(3)) - val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] - val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] - val mappedStream = windowedStream1.dependencies.head - - // Checkpoint remember durations - assert(windowedStream2.rememberDuration === rememberDuration) - assert(windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) - assert(mappedStream.rememberDuration === - rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) - - // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 - // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 - // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 - - // WindowedStream2 - assert(windowedStream2.generatedRDDs.contains(Time(10000))) - assert(windowedStream2.generatedRDDs.contains(Time(8000))) - assert(!windowedStream2.generatedRDDs.contains(Time(6000))) - - // WindowedStream1 - assert(windowedStream1.generatedRDDs.contains(Time(10000))) - assert(windowedStream1.generatedRDDs.contains(Time(4000))) - assert(!windowedStream1.generatedRDDs.contains(Time(3000))) - - // MappedStream - assert(mappedStream.generatedRDDs.contains(Time(10000))) - assert(mappedStream.generatedRDDs.contains(Time(2000))) - assert(!mappedStream.generatedRDDs.contains(Time(1000))) + runCleanupTest( + conf, + operation _, + numExpectedOutput = cleanupTestInput.size / 2, + rememberDuration = Seconds(3)) { operatedStream => + eventually(eventuallyTimeout) { + val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]] + val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]] + val mappedStream = windowedStream1.dependencies.head + + // Checkpoint remember durations + assert(windowedStream2.rememberDuration === rememberDuration) + assert( + windowedStream1.rememberDuration === rememberDuration + windowedStream2.windowDuration) + assert(mappedStream.rememberDuration === + rememberDuration + windowedStream2.windowDuration + windowedStream1.windowDuration) + + // WindowedStream2 should remember till 7 seconds: 10, 9, 8, 7 + // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 + // MappedStream should remember till 2 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 + + // WindowedStream2 + assert(windowedStream2.generatedRDDs.contains(Time(10000))) + assert(windowedStream2.generatedRDDs.contains(Time(8000))) + assert(!windowedStream2.generatedRDDs.contains(Time(6000))) + + // WindowedStream1 + assert(windowedStream1.generatedRDDs.contains(Time(10000))) + assert(windowedStream1.generatedRDDs.contains(Time(4000))) + assert(!windowedStream1.generatedRDDs.contains(Time(3000))) + + // MappedStream + assert(mappedStream.generatedRDDs.contains(Time(10000))) + assert(mappedStream.generatedRDDs.contains(Time(2000))) + assert(!mappedStream.generatedRDDs.contains(Time(1000))) + } + } } test("rdd cleanup - updateStateByKey") { val updateFunc = (values: Seq[Int], state: Option[Int]) => { Some(values.sum + state.getOrElse(0)) } - val stateStream = runCleanupTest( - conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) - - assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2) - assert(stateStream.generatedRDDs.contains(Time(10000))) - assert(!stateStream.generatedRDDs.contains(Time(4000))) + runCleanupTest( + conf, _.map(_ -> 1).updateStateByKey(updateFunc).checkpoint(Seconds(3))) { stateStream => + eventually(eventuallyTimeout) { + assert(stateStream.rememberDuration === stateStream.checkpointDuration * 2) + assert(stateStream.generatedRDDs.contains(Time(10000))) + assert(!stateStream.generatedRDDs.contains(Time(4000))) + } + } } test("rdd cleanup - input blocks and persisted RDDs") { @@ -779,13 +788,16 @@ class BasicOperationsSuite extends TestSuiteBase { } } - /** Test cleanup of RDDs in DStream metadata */ + /** + * Test cleanup of RDDs in DStream metadata. `assertCleanup` is the function that asserts the + * cleanup of RDDs is successful. + */ def runCleanupTest[T: ClassTag]( conf2: SparkConf, operation: DStream[Int] => DStream[T], numExpectedOutput: Int = cleanupTestInput.size, rememberDuration: Duration = null - ): DStream[T] = { + )(assertCleanup: (DStream[T]) => Unit): DStream[T] = { // Setup the stream computation assert(batchDuration === Seconds(1), @@ -794,7 +806,11 @@ class BasicOperationsSuite extends TestSuiteBase { val operatedStream = ssc.graph.getOutputStreams().head.dependencies.head.asInstanceOf[DStream[T]] if (rememberDuration != null) ssc.remember(rememberDuration) - val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) + val output = runStreams[(Int, Int)]( + ssc, + cleanupTestInput.size, + numExpectedOutput, + () => assertCleanup(operatedStream)) val clock = ssc.scheduler.clock.asInstanceOf[Clock] assert(clock.getTimeMillis() === Seconds(10).milliseconds) assert(output.size === numExpectedOutput) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index fa975a1462..dbab708861 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -359,14 +359,20 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. * * Returns a sequence of items for each RDD. + * + * @param ssc The StreamingContext + * @param numBatches The number of batches should be run + * @param numExpectedOutput The number of expected output + * @param preStop The function to run before stopping StreamingContext */ def runStreams[V: ClassTag]( ssc: StreamingContext, numBatches: Int, - numExpectedOutput: Int + numExpectedOutput: Int, + preStop: () => Unit = () => {} ): Seq[Seq[V]] = { // Flatten each RDD into a single Seq - runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq) + runStreamsWithPartitions(ssc, numBatches, numExpectedOutput, preStop).map(_.flatten.toSeq) } /** @@ -376,11 +382,17 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { * * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. + * + * @param ssc The StreamingContext + * @param numBatches The number of batches should be run + * @param numExpectedOutput The number of expected output + * @param preStop The function to run before stopping StreamingContext */ def runStreamsWithPartitions[V: ClassTag]( ssc: StreamingContext, numBatches: Int, - numExpectedOutput: Int + numExpectedOutput: Int, + preStop: () => Unit = () => {} ): Seq[Seq[Seq[V]]] = { assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") @@ -424,6 +436,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") Thread.sleep(100) // Give some time for the forgetting old RDDs to complete + preStop() } finally { ssc.stop(stopSparkContext = true) } |