diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-10-29 17:59:16 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2014-10-29 17:59:16 -0700 |
commit | 1234258077b1f4050845e9fb73066b37f981c72a (patch) | |
tree | f9e87bcfb7b36f6c58e1a1d89ab43f724c859089 | |
parent | 8d59b37b02eb36f37bcefafb952519d7dca744ad (diff) | |
download | spark-1234258077b1f4050845e9fb73066b37f981c72a.tar.gz spark-1234258077b1f4050845e9fb73066b37f981c72a.tar.bz2 spark-1234258077b1f4050845e9fb73066b37f981c72a.zip |
[SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by fixing block generator throttling
In the unit test that checked whether blocks generated by throttled block generator had expected number of records, the thresholds are too tight, which sometimes led to the test failing.
This PR fixes it by relaxing the thresholds and the time intervals for testing.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #2900 from tdas/receiver-suite-flakiness and squashes the following commits:
28508a2 [Tathagata Das] Made the ReceiverSuite test more reliable
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala (renamed from streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala) | 44 |
1 files changed, 30 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index eb6e88cf55..0f6a9489db 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -31,9 +31,9 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ /** Testsuite for testing the network receiver behavior */ -class NetworkReceiverSuite extends FunSuite with Timeouts { +class ReceiverSuite extends FunSuite with Timeouts { - test("network receiver life cycle") { + test("receiver life cycle") { val receiver = new FakeReceiver val executor = new FakeReceiverSupervisor(receiver) @@ -152,8 +152,8 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { test("block generator throttling") { val blockGeneratorListener = new FakeBlockGeneratorListener - val blockInterval = 50 - val maxRate = 200 + val blockInterval = 100 + val maxRate = 100 val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString). set("spark.streaming.receiver.maxRate", maxRate.toString) val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) @@ -175,19 +175,35 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { } blockGenerator.stop() - val recordedData = blockGeneratorListener.arrayBuffers - assert(blockGeneratorListener.arrayBuffers.size > 0) - assert(recordedData.flatten.toSet === generatedData.toSet) + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + assert(recordedData.toSet === generatedData.toSet, "Received data not same") + // recordedData size should be close to the expected rate - assert(recordedData.flatten.size >= expectedMessages * 0.9 && - recordedData.flatten.size <= expectedMessages * 1.1 ) - // the first and last block may be incomplete, so we slice them out - recordedData.slice(1, recordedData.size - 1).foreach { block => - assert(block.size >= expectedMessagesPerBlock * 0.8 && - block.size <= expectedMessagesPerBlock * 1.2 ) - } + val minExpectedMessages = expectedMessages - 3 + val maxExpectedMessages = expectedMessages + 1 + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3 + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1 + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes) + assert( + // the first and last block may be incomplete, so we slice them out + recordedBlocks.drop(1).dropRight(1).forall { block => + block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock + }, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock" + ) } + /** * An implementation of NetworkReceiver that is used for testing a receiver's life cycle. */ |