diff options
author | Iulian Dragos <jaguarul@gmail.com> | 2015-07-22 15:54:08 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-22 15:54:08 -0700 |
commit | 798dff7b4baa952c609725b852bcb6a9c9e5a317 (patch) | |
tree | 18cf81cb9b10ae26403866a74f9b1d776b5b5c00 /streaming/src/main | |
parent | fe26584a1f5b472fb2e87aa7259aec822a619a3b (diff) | |
download | spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.gz spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.bz2 spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.zip |
[SPARK-8975] [STREAMING] Adds a mechanism to send a new rate from the driver to the block generator
First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398).
tdas huitseeker
Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>
Closes #7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits:
8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
Diffstat (limited to 'streaming/src/main')
6 files changed, 45 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 8df542b367..f663def4c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -34,12 +34,32 @@ import org.apache.spark.{Logging, SparkConf} */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) - private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) + // treated as an upper limit + private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) + private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) def waitToPush() { - if (desiredRate > 0) { - rateLimiter.acquire() - } + rateLimiter.acquire() } + + /** + * Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}. + */ + def getCurrentLimit: Long = + rateLimiter.getRate.toLong + + /** + * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by + * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. + * + * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. + */ + private[receiver] def updateRate(newRate: Long): Unit = + if (newRate > 0) { + if (maxRateLimit > 0) { + rateLimiter.setRate(newRate.min(maxRateLimit)) + } else { + rateLimiter.setRate(newRate) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 5b5a3fe648..7504fa44d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable } /** Get the attached executor. */ - private def executor = { + private def executor: ReceiverSupervisor = { assert(executor_ != null, "Executor has not been attached to this receiver") executor_ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 7bf3c33319..1eb55affaa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage - +private[streaming] case class UpdateRateLimit(elementsPerSecond: Long) + extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 6467029a27..a7c220f426 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -59,6 +59,9 @@ private[streaming] abstract class ReceiverSupervisor( /** Time between a receiver is stopped and started again */ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) + /** The current maximum rate limit for this receiver. */ + private[streaming] def getCurrentRateLimit: Option[Long] = None + /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index f6ba66b3ae..2f6841ee88 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,6 +77,9 @@ private[streaming] class ReceiverSupervisorImpl( case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) + case UpdateRateLimit(eps) => + logInfo(s"Received a new rate limit: $eps.") + blockGenerator.updateRate(eps) } }) @@ -98,6 +101,9 @@ private[streaming] class ReceiverSupervisorImpl( } }, streamId, env.conf) + override private[streaming] def getCurrentRateLimit: Option[Long] = + Some(blockGenerator.getCurrentLimit) + /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { blockGenerator.addData(data) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 6910d81d98..9cc6ffcd12 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, - StopReceiver} + StopReceiver, UpdateRateLimit} import org.apache.spark.util.SerializableConfiguration /** @@ -226,6 +226,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logError(s"Deregistered receiver for stream $streamId: $messageWithError") } + /** Update a receiver's maximum ingestion rate */ + def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { + for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) { + eP.send(UpdateRateLimit(newRate)) + } + } + /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) |