aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorIssac Buenrostro <buenrostro@ooyala.com>2014-07-10 16:01:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-10 16:01:28 -0700
commitca19cfbcd5cfac9ad731350dfeea14355aec87d6 (patch)
treef47633a40dd72afb30cc1c0de7838f6f8b192f43 /streaming/src/test
parentcb443cf6c3634d4416d9b708e7983c9283660724 (diff)
downloadspark-ca19cfbcd5cfac9ad731350dfeea14355aec87d6.tar.gz
spark-ca19cfbcd5cfac9ad731350dfeea14355aec87d6.tar.bz2
spark-ca19cfbcd5cfac9ad731350dfeea14355aec87d6.zip
[SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption.
Author: Issac Buenrostro <buenrostro@ooyala.com> Closes #945 from ibuenros/SPARK-1341-throttle and squashes the following commits: 5514916 [Issac Buenrostro] Formatting changes, added documentation for streaming throttling, stricter unit tests for throttling. 62f395f [Issac Buenrostro] Add comments and license to streaming RateLimiter.scala 7066438 [Issac Buenrostro] Moved throttle code to RateLimiter class, smoother pushing when throttling active ccafe09 [Issac Buenrostro] Throttle BlockGenerator to limit rate of data consumption. (cherry picked from commit 2dd67248503306bb08946b1796821e9f9ed4d00e) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala38
1 files changed, 38 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index 303d149d28..2dfd1c1a5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -146,6 +146,44 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
assert(recordedData.toSet === generatedData.toSet)
}
+ test("block generator throttling") {
+ val blockGeneratorListener = new FakeBlockGeneratorListener
+ val blockInterval = 50
+ val maxRate = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+ set("spark.streaming.receiver.maxRate", maxRate.toString)
+ val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+ val expectedBlocks = 20
+ val waitTime = expectedBlocks * blockInterval
+ val expectedMessages = maxRate * waitTime / 1000
+ val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+ val generatedData = new ArrayBuffer[Int]
+
+ // Generate blocks
+ val startTime = System.currentTimeMillis()
+ blockGenerator.start()
+ var count = 0
+ while(System.currentTimeMillis - startTime < waitTime) {
+ blockGenerator += count
+ generatedData += count
+ count += 1
+ Thread.sleep(1)
+ }
+ blockGenerator.stop()
+
+ val recordedData = blockGeneratorListener.arrayBuffers
+ assert(blockGeneratorListener.arrayBuffers.size > 0)
+ assert(recordedData.flatten.toSet === generatedData.toSet)
+ // recordedData size should be close to the expected rate
+ assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
+ recordedData.flatten.size <= expectedMessages * 1.1 )
+ // the first and last block may be incomplete, so we slice them out
+ recordedData.slice(1, recordedData.size - 1).foreach { block =>
+ assert(block.size >= expectedMessagesPerBlock * 0.8 &&
+ block.size <= expectedMessagesPerBlock * 1.2 )
+ }
+ }
+
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/