aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-07-22 15:54:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-22 15:54:08 -0700
commit798dff7b4baa952c609725b852bcb6a9c9e5a317 (patch)
tree18cf81cb9b10ae26403866a74f9b1d776b5b5c00 /streaming/src/test
parentfe26584a1f5b472fb2e87aa7259aec822a619a3b (diff)
downloadspark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.gz
spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.bz2
spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.zip
[SPARK-8975] [STREAMING] Adds a mechanism to send a new rate from the driver to the block generator
First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398). tdas huitseeker Author: Iulian Dragos <jaguarul@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits: 8941cf9 [Iulian Dragos] Renames and other nitpicks. 162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). 210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate." 0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate. 261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers. 6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975 d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate 4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala46
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala62
2 files changed, 108 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala
new file mode 100644
index 0000000000..c6330eb367
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkFunSuite
+
+/** Testsuite for testing the network receiver behavior */
+class RateLimiterSuite extends SparkFunSuite {
+
+ test("rate limiter initializes even without a maxRate set") {
+ val conf = new SparkConf()
+ val rateLimiter = new RateLimiter(conf){}
+ rateLimiter.updateRate(105)
+ assert(rateLimiter.getCurrentLimit == 105)
+ }
+
+ test("rate limiter updates when below maxRate") {
+ val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110")
+ val rateLimiter = new RateLimiter(conf){}
+ rateLimiter.updateRate(105)
+ assert(rateLimiter.getCurrentLimit == 105)
+ }
+
+ test("rate limiter stays below maxRate despite large updates") {
+ val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100")
+ val rateLimiter = new RateLimiter(conf){}
+ rateLimiter.updateRate(105)
+ assert(rateLimiter.getCurrentLimit === 100)
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index a6e783861d..aadb723175 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -17,11 +17,17 @@
package org.apache.spark.streaming.scheduler
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver._
import org.apache.spark.util.Utils
+import org.apache.spark.streaming.dstream.InputDStream
+import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
/** Testsuite for receiver scheduling */
class ReceiverTrackerSuite extends TestSuiteBase {
@@ -72,8 +78,64 @@ class ReceiverTrackerSuite extends TestSuiteBase {
assert(locations(0).length === 1)
assert(locations(3).length === 1)
}
+
+ test("Receiver tracker - propagates rate limit") {
+ object ReceiverStartedWaiter extends StreamingListener {
+ @volatile
+ var started = false
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
+ started = true
+ }
+ }
+
+ ssc.addStreamingListener(ReceiverStartedWaiter)
+ ssc.scheduler.listenerBus.start(ssc.sc)
+
+ val newRateLimit = 100L
+ val inputDStream = new RateLimitInputDStream(ssc)
+ val tracker = new ReceiverTracker(ssc)
+ tracker.start()
+
+ // we wait until the Receiver has registered with the tracker,
+ // otherwise our rate update is lost
+ eventually(timeout(5 seconds)) {
+ assert(ReceiverStartedWaiter.started)
+ }
+ tracker.sendRateUpdate(inputDStream.id, newRateLimit)
+ // this is an async message, we need to wait a bit for it to be processed
+ eventually(timeout(3 seconds)) {
+ assert(inputDStream.getCurrentRateLimit.get === newRateLimit)
+ }
+ }
}
+/** An input DStream with a hard-coded receiver that gives access to internals for testing. */
+private class RateLimitInputDStream(@transient ssc_ : StreamingContext)
+ extends ReceiverInputDStream[Int](ssc_) {
+
+ override def getReceiver(): DummyReceiver = SingletonDummyReceiver
+
+ def getCurrentRateLimit: Option[Long] = {
+ invokeExecutorMethod.getCurrentRateLimit
+ }
+
+ private def invokeExecutorMethod: ReceiverSupervisor = {
+ val c = classOf[Receiver[_]]
+ val ex = c.getDeclaredMethod("executor")
+ ex.setAccessible(true)
+ ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor]
+ }
+}
+
+/**
+ * A Receiver as an object so we can read its rate limit.
+ *
+ * @note It's necessary to be a top-level object, or else serialization would create another
+ * one on the executor side and we won't be able to read its rate limit.
+ */
+private object SingletonDummyReceiver extends DummyReceiver
+
/**
* Dummy receiver implementation
*/