aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-07-22 15:54:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-22 15:54:08 -0700
commit798dff7b4baa952c609725b852bcb6a9c9e5a317 (patch)
tree18cf81cb9b10ae26403866a74f9b1d776b5b5c00 /streaming/src/main
parentfe26584a1f5b472fb2e87aa7259aec822a619a3b (diff)
downloadspark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.gz
spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.bz2
spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.zip
[SPARK-8975] [STREAMING] Adds a mechanism to send a new rate from the driver to the block generator
First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398). tdas huitseeker Author: Iulian Dragos <jaguarul@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits: 8941cf9 [Iulian Dragos] Renames and other nitpicks. 162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). 210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate." 0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate. 261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers. 6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975 d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate 4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala30
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala9
6 files changed, 45 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index 8df542b367..f663def4c0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -34,12 +34,32 @@ import org.apache.spark.{Logging, SparkConf}
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
- private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
- private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
+ // treated as an upper limit
+ private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
+ private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
def waitToPush() {
- if (desiredRate > 0) {
- rateLimiter.acquire()
- }
+ rateLimiter.acquire()
}
+
+ /**
+ * Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}.
+ */
+ def getCurrentLimit: Long =
+ rateLimiter.getRate.toLong
+
+ /**
+ * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
+ * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
+ *
+ * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
+ */
+ private[receiver] def updateRate(newRate: Long): Unit =
+ if (newRate > 0) {
+ if (maxRateLimit > 0) {
+ rateLimiter.setRate(newRate.min(maxRateLimit))
+ } else {
+ rateLimiter.setRate(newRate)
+ }
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 5b5a3fe648..7504fa44d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}
/** Get the attached executor. */
- private def executor = {
+ private def executor: ReceiverSupervisor = {
assert(executor_ != null, "Executor has not been attached to this receiver")
executor_
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index 7bf3c33319..1eb55affaa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time
private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage
-
+private[streaming] case class UpdateRateLimit(elementsPerSecond: Long)
+ extends ReceiverMessage
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 6467029a27..a7c220f426 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -59,6 +59,9 @@ private[streaming] abstract class ReceiverSupervisor(
/** Time between a receiver is stopped and started again */
private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
+ /** The current maximum rate limit for this receiver. */
+ private[streaming] def getCurrentRateLimit: Option[Long] = None
+
/** Exception associated with the stopping of the receiver */
@volatile protected var stoppingError: Throwable = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index f6ba66b3ae..2f6841ee88 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -77,6 +77,9 @@ private[streaming] class ReceiverSupervisorImpl(
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
+ case UpdateRateLimit(eps) =>
+ logInfo(s"Received a new rate limit: $eps.")
+ blockGenerator.updateRate(eps)
}
})
@@ -98,6 +101,9 @@ private[streaming] class ReceiverSupervisorImpl(
}
}, streamId, env.conf)
+ override private[streaming] def getCurrentRateLimit: Option[Long] =
+ Some(blockGenerator.getCurrentLimit)
+
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
blockGenerator.addData(data)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 6910d81d98..9cc6ffcd12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
- StopReceiver}
+ StopReceiver, UpdateRateLimit}
import org.apache.spark.util.SerializableConfiguration
/**
@@ -226,6 +226,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
}
+ /** Update a receiver's maximum ingestion rate */
+ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
+ for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) {
+ eP.send(UpdateRateLimit(newRate))
+ }
+ }
+
/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)