aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-10-29 17:59:16 -0700
committerAndrew Or <andrew@databricks.com>2014-10-29 17:59:16 -0700
commit1234258077b1f4050845e9fb73066b37f981c72a (patch)
treef9e87bcfb7b36f6c58e1a1d89ab43f724c859089 /streaming/src
parent8d59b37b02eb36f37bcefafb952519d7dca744ad (diff)
downloadspark-1234258077b1f4050845e9fb73066b37f981c72a.tar.gz
spark-1234258077b1f4050845e9fb73066b37f981c72a.tar.bz2
spark-1234258077b1f4050845e9fb73066b37f981c72a.zip
[SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by fixing block generator throttling
In the unit test that checked whether blocks generated by throttled block generator had expected number of records, the thresholds are too tight, which sometimes led to the test failing. This PR fixes it by relaxing the thresholds and the time intervals for testing. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2900 from tdas/receiver-suite-flakiness and squashes the following commits: 28508a2 [Tathagata Das] Made the ReceiverSuite test more reliable
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala (renamed from streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala)44
1 files changed, 30 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index eb6e88cf55..0f6a9489db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -31,9 +31,9 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
/** Testsuite for testing the network receiver behavior */
-class NetworkReceiverSuite extends FunSuite with Timeouts {
+class ReceiverSuite extends FunSuite with Timeouts {
- test("network receiver life cycle") {
+ test("receiver life cycle") {
val receiver = new FakeReceiver
val executor = new FakeReceiverSupervisor(receiver)
@@ -152,8 +152,8 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
- val blockInterval = 50
- val maxRate = 200
+ val blockInterval = 100
+ val maxRate = 100
val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
@@ -175,19 +175,35 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
}
blockGenerator.stop()
- val recordedData = blockGeneratorListener.arrayBuffers
- assert(blockGeneratorListener.arrayBuffers.size > 0)
- assert(recordedData.flatten.toSet === generatedData.toSet)
+ val recordedBlocks = blockGeneratorListener.arrayBuffers
+ val recordedData = recordedBlocks.flatten
+ assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
+ assert(recordedData.toSet === generatedData.toSet, "Received data not same")
+
// recordedData size should be close to the expected rate
- assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
- recordedData.flatten.size <= expectedMessages * 1.1 )
- // the first and last block may be incomplete, so we slice them out
- recordedData.slice(1, recordedData.size - 1).foreach { block =>
- assert(block.size >= expectedMessagesPerBlock * 0.8 &&
- block.size <= expectedMessagesPerBlock * 1.2 )
- }
+ val minExpectedMessages = expectedMessages - 3
+ val maxExpectedMessages = expectedMessages + 1
+ val numMessages = recordedData.size
+ assert(
+ numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
+ s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
+ )
+
+ val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
+ val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
+ val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
+ println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
+ assert(
+ // the first and last block may be incomplete, so we slice them out
+ recordedBlocks.drop(1).dropRight(1).forall { block =>
+ block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
+ },
+ s"# records in received blocks = [$receivedBlockSizes], not between " +
+ s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
+ )
}
+
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/