diff options
author | Iulian Dragos <jaguarul@gmail.com> | 2015-07-22 15:54:08 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-22 15:54:08 -0700 |
commit | 798dff7b4baa952c609725b852bcb6a9c9e5a317 (patch) | |
tree | 18cf81cb9b10ae26403866a74f9b1d776b5b5c00 /streaming/src/test | |
parent | fe26584a1f5b472fb2e87aa7259aec822a619a3b (diff) | |
download | spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.gz spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.tar.bz2 spark-798dff7b4baa952c609725b852bcb6a9c9e5a317.zip |
[SPARK-8975] [STREAMING] Adds a mechanism to send a new rate from the driver to the block generator
First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398).
tdas huitseeker
Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>
Closes #7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits:
8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala | 46 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala | 62 |
2 files changed, 108 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala new file mode 100644 index 0000000000..c6330eb367 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import org.apache.spark.SparkConf +import org.apache.spark.SparkFunSuite + +/** Testsuite for testing the network receiver behavior */ +class RateLimiterSuite extends SparkFunSuite { + + test("rate limiter initializes even without a maxRate set") { + val conf = new SparkConf() + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.getCurrentLimit == 105) + } + + test("rate limiter updates when below maxRate") { + val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "110") + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.getCurrentLimit == 105) + } + + test("rate limiter stays below maxRate despite large updates") { + val conf = new SparkConf().set("spark.streaming.receiver.maxRate", "100") + val rateLimiter = new RateLimiter(conf){} + rateLimiter.updateRate(105) + assert(rateLimiter.getCurrentLimit === 100) + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index a6e783861d..aadb723175 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -17,11 +17,17 @@ package org.apache.spark.streaming.scheduler +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming._ import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils +import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */ class ReceiverTrackerSuite extends TestSuiteBase { @@ -72,8 +78,64 @@ class ReceiverTrackerSuite extends TestSuiteBase { assert(locations(0).length === 1) assert(locations(3).length === 1) } + + test("Receiver tracker - propagates rate limit") { + object ReceiverStartedWaiter extends StreamingListener { + @volatile + var started = false + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + started = true + } + } + + ssc.addStreamingListener(ReceiverStartedWaiter) + ssc.scheduler.listenerBus.start(ssc.sc) + + val newRateLimit = 100L + val inputDStream = new RateLimitInputDStream(ssc) + val tracker = new ReceiverTracker(ssc) + tracker.start() + + // we wait until the Receiver has registered with the tracker, + // otherwise our rate update is lost + eventually(timeout(5 seconds)) { + assert(ReceiverStartedWaiter.started) + } + tracker.sendRateUpdate(inputDStream.id, newRateLimit) + // this is an async message, we need to wait a bit for it to be processed + eventually(timeout(3 seconds)) { + assert(inputDStream.getCurrentRateLimit.get === newRateLimit) + } + } } +/** An input DStream with a hard-coded receiver that gives access to internals for testing. */ +private class RateLimitInputDStream(@transient ssc_ : StreamingContext) + extends ReceiverInputDStream[Int](ssc_) { + + override def getReceiver(): DummyReceiver = SingletonDummyReceiver + + def getCurrentRateLimit: Option[Long] = { + invokeExecutorMethod.getCurrentRateLimit + } + + private def invokeExecutorMethod: ReceiverSupervisor = { + val c = classOf[Receiver[_]] + val ex = c.getDeclaredMethod("executor") + ex.setAccessible(true) + ex.invoke(SingletonDummyReceiver).asInstanceOf[ReceiverSupervisor] + } +} + +/** + * A Receiver as an object so we can read its rate limit. + * + * @note It's necessary to be a top-level object, or else serialization would create another + * one on the executor side and we won't be able to read its rate limit. + */ +private object SingletonDummyReceiver extends DummyReceiver + /** * Dummy receiver implementation */ |