aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorIssac Buenrostro <buenrostro@ooyala.com>2014-07-10 16:01:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-10 16:01:08 -0700
commit2dd67248503306bb08946b1796821e9f9ed4d00e (patch)
treea39be5352c3e2ca6467fbca6bee6430ce84974d4 /streaming
parent40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (diff)
downloadspark-2dd67248503306bb08946b1796821e9f9ed4d00e.tar.gz
spark-2dd67248503306bb08946b1796821e9f9ed4d00e.tar.bz2
spark-2dd67248503306bb08946b1796821e9f9ed4d00e.zip
[SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption.
Author: Issac Buenrostro <buenrostro@ooyala.com> Closes #945 from ibuenros/SPARK-1341-throttle and squashes the following commits: 5514916 [Issac Buenrostro] Formatting changes, added documentation for streaming throttling, stricter unit tests for throttling. 62f395f [Issac Buenrostro] Add comments and license to streaming RateLimiter.scala 7066438 [Issac Buenrostro] Moved throttle code to RateLimiter class, smoother pushing when throttling active ccafe09 [Issac Buenrostro] Throttle BlockGenerator to limit rate of data consumption.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala69
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala38
3 files changed, 109 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 78cc2daa56..0316b6862f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -44,7 +44,7 @@ private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
- ) extends Logging {
+ ) extends RateLimiter(conf) with Logging {
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
@@ -81,6 +81,7 @@ private[streaming] class BlockGenerator(
* will be periodically pushed into BlockManager.
*/
def += (data: Any): Unit = synchronized {
+ waitToPush()
currentBuffer += data
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
new file mode 100644
index 0000000000..e4f6ba626e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import org.apache.spark.{Logging, SparkConf}
+import java.util.concurrent.TimeUnit._
+
+/** Provides waitToPush() method to limit the rate at which receivers consume data.
+ *
+ * waitToPush method will block the thread if too many messages have been pushed too quickly,
+ * and only return when a new message has been pushed. It assumes that only one message is
+ * pushed at a time.
+ *
+ * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
+ * per second that each receiver will accept.
+ *
+ * @param conf spark configuration
+ */
+private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
+
+ private var lastSyncTime = System.nanoTime
+ private var messagesWrittenSinceSync = 0L
+ private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
+ private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+
+ def waitToPush() {
+ if( desiredRate <= 0 ) {
+ return
+ }
+ val now = System.nanoTime
+ val elapsedNanosecs = math.max(now - lastSyncTime, 1)
+ val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
+ if (rate < desiredRate) {
+ // It's okay to write; just update some variables and return
+ messagesWrittenSinceSync += 1
+ if (now > lastSyncTime + SYNC_INTERVAL) {
+ // Sync interval has passed; let's resync
+ lastSyncTime = now
+ messagesWrittenSinceSync = 1
+ }
+ } else {
+ // Calculate how much time we should sleep to bring ourselves to the desired rate.
+ val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
+ val elapsedTimeInMillis = elapsedNanosecs / 1000000
+ val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
+ if (sleepTimeInMillis > 0) {
+ logTrace("Natural rate is " + rate + " per second but desired rate is " +
+ desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
+ Thread.sleep(sleepTimeInMillis)
+ }
+ waitToPush()
+ }
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index d9ac3c91f6..f4e11f975d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -145,6 +145,44 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
assert(recordedData.toSet === generatedData.toSet)
}
+ test("block generator throttling") {
+ val blockGeneratorListener = new FakeBlockGeneratorListener
+ val blockInterval = 50
+ val maxRate = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+ set("spark.streaming.receiver.maxRate", maxRate.toString)
+ val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+ val expectedBlocks = 20
+ val waitTime = expectedBlocks * blockInterval
+ val expectedMessages = maxRate * waitTime / 1000
+ val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+ val generatedData = new ArrayBuffer[Int]
+
+ // Generate blocks
+ val startTime = System.currentTimeMillis()
+ blockGenerator.start()
+ var count = 0
+ while(System.currentTimeMillis - startTime < waitTime) {
+ blockGenerator += count
+ generatedData += count
+ count += 1
+ Thread.sleep(1)
+ }
+ blockGenerator.stop()
+
+ val recordedData = blockGeneratorListener.arrayBuffers
+ assert(blockGeneratorListener.arrayBuffers.size > 0)
+ assert(recordedData.flatten.toSet === generatedData.toSet)
+ // recordedData size should be close to the expected rate
+ assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
+ recordedData.flatten.size <= expectedMessages * 1.1 )
+ // the first and last block may be incomplete, so we slice them out
+ recordedData.slice(1, recordedData.size - 1).foreach { block =>
+ assert(block.size >= expectedMessagesPerBlock * 0.8 &&
+ block.size <= expectedMessagesPerBlock * 1.2 )
+ }
+ }
+
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/