aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala (renamed from streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala)44
1 files changed, 30 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index eb6e88cf55..0f6a9489db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -31,9 +31,9 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
/** Testsuite for testing the network receiver behavior */
-class NetworkReceiverSuite extends FunSuite with Timeouts {
+class ReceiverSuite extends FunSuite with Timeouts {
- test("network receiver life cycle") {
+ test("receiver life cycle") {
val receiver = new FakeReceiver
val executor = new FakeReceiverSupervisor(receiver)
@@ -152,8 +152,8 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
- val blockInterval = 50
- val maxRate = 200
+ val blockInterval = 100
+ val maxRate = 100
val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
@@ -175,19 +175,35 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
}
blockGenerator.stop()
- val recordedData = blockGeneratorListener.arrayBuffers
- assert(blockGeneratorListener.arrayBuffers.size > 0)
- assert(recordedData.flatten.toSet === generatedData.toSet)
+ val recordedBlocks = blockGeneratorListener.arrayBuffers
+ val recordedData = recordedBlocks.flatten
+ assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
+ assert(recordedData.toSet === generatedData.toSet, "Received data not same")
+
// recordedData size should be close to the expected rate
- assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
- recordedData.flatten.size <= expectedMessages * 1.1 )
- // the first and last block may be incomplete, so we slice them out
- recordedData.slice(1, recordedData.size - 1).foreach { block =>
- assert(block.size >= expectedMessagesPerBlock * 0.8 &&
- block.size <= expectedMessagesPerBlock * 1.2 )
- }
+ val minExpectedMessages = expectedMessages - 3
+ val maxExpectedMessages = expectedMessages + 1
+ val numMessages = recordedData.size
+ assert(
+ numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
+ s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
+ )
+
+ val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
+ val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
+ val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
+ println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
+ assert(
+ // the first and last block may be incomplete, so we slice them out
+ recordedBlocks.drop(1).dropRight(1).forall { block =>
+ block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
+ },
+ s"# records in received blocks = [$receivedBlockSizes], not between " +
+ s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
+ )
}
+
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/