aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorIssac Buenrostro <buenrostro@ooyala.com>2014-07-10 16:01:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-10 16:01:08 -0700
commit2dd67248503306bb08946b1796821e9f9ed4d00e (patch)
treea39be5352c3e2ca6467fbca6bee6430ce84974d4 /streaming/src/main
parent40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (diff)
downloadspark-2dd67248503306bb08946b1796821e9f9ed4d00e.tar.gz
spark-2dd67248503306bb08946b1796821e9f9ed4d00e.tar.bz2
spark-2dd67248503306bb08946b1796821e9f9ed4d00e.zip
[SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption.
Author: Issac Buenrostro <buenrostro@ooyala.com> Closes #945 from ibuenros/SPARK-1341-throttle and squashes the following commits: 5514916 [Issac Buenrostro] Formatting changes, added documentation for streaming throttling, stricter unit tests for throttling. 62f395f [Issac Buenrostro] Add comments and license to streaming RateLimiter.scala 7066438 [Issac Buenrostro] Moved throttle code to RateLimiter class, smoother pushing when throttling active ccafe09 [Issac Buenrostro] Throttle BlockGenerator to limit rate of data consumption.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala69
2 files changed, 71 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 78cc2daa56..0316b6862f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -44,7 +44,7 @@ private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
- ) extends Logging {
+ ) extends RateLimiter(conf) with Logging {
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
@@ -81,6 +81,7 @@ private[streaming] class BlockGenerator(
* will be periodically pushed into BlockManager.
*/
def += (data: Any): Unit = synchronized {
+ waitToPush()
currentBuffer += data
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
new file mode 100644
index 0000000000..e4f6ba626e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import org.apache.spark.{Logging, SparkConf}
+import java.util.concurrent.TimeUnit._
+
+/** Provides waitToPush() method to limit the rate at which receivers consume data.
+ *
+ * waitToPush method will block the thread if too many messages have been pushed too quickly,
+ * and only return when a new message has been pushed. It assumes that only one message is
+ * pushed at a time.
+ *
+ * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
+ * per second that each receiver will accept.
+ *
+ * @param conf spark configuration
+ */
+private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
+
+ private var lastSyncTime = System.nanoTime
+ private var messagesWrittenSinceSync = 0L
+ private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
+ private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+
+ def waitToPush() {
+ if( desiredRate <= 0 ) {
+ return
+ }
+ val now = System.nanoTime
+ val elapsedNanosecs = math.max(now - lastSyncTime, 1)
+ val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
+ if (rate < desiredRate) {
+ // It's okay to write; just update some variables and return
+ messagesWrittenSinceSync += 1
+ if (now > lastSyncTime + SYNC_INTERVAL) {
+ // Sync interval has passed; let's resync
+ lastSyncTime = now
+ messagesWrittenSinceSync = 1
+ }
+ } else {
+ // Calculate how much time we should sleep to bring ourselves to the desired rate.
+ val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
+ val elapsedTimeInMillis = elapsedNanosecs / 1000000
+ val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
+ if (sleepTimeInMillis > 0) {
+ logTrace("Natural rate is " + rate + " per second but desired rate is " +
+ desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
+ Thread.sleep(sleepTimeInMillis)
+ }
+ waitToPush()
+ }
+ }
+}