diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-06 14:35:30 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-06 14:36:55 -0700 |
commit | 3997dd3fde0f1f67ddc4941921a8ce1449bb44f0 (patch) | |
tree | 3dd33e8c34634c6797da37561e230faaadf2b395 /extras/kinesis-asl | |
parent | 3137628bcd21c5e07b738a54c4ed5664e4a12433 (diff) | |
download | spark-3997dd3fde0f1f67ddc4941921a8ce1449bb44f0.tar.gz spark-3997dd3fde0f1f67ddc4941921a8ce1449bb44f0.tar.bz2 spark-3997dd3fde0f1f67ddc4941921a8ce1449bb44f0.zip |
[SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates
In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied.
In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular.
- Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`)
- Made `RateControllerSuite` faster (by increasing batch interval) and less flaky
- Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places).
- Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #7913 from tdas/SPARK-9556 and squashes the following commits:
41d4461 [Tathagata Das] fix scala style
eb9fd59 [Tathagata Das] Updated kinesis receiver
d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator
d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization
f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556
31da173 [Tathagata Das] Fix bug
12116df [Tathagata Das] Add BlockGeneratorSuite
74bd069 [Tathagata Das] Fix style
989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it
3ff618c [Tathagata Das] Fix test
b40eff8 [Tathagata Das] slight refactoring
f0df0f1 [Tathagata Das] Scala style fixes
51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator
(cherry picked from commit 0a078303d08ad2bb92b9a8a6969563d75b512290)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'extras/kinesis-asl')
-rw-r--r-- | extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index a4baeec084..22324e821c 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -136,7 +136,7 @@ private[kinesis] class KinesisReceiver( * The KCL creates and manages the receiving/processing thread pool through Worker.run(). */ override def onStart() { - blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, SparkEnv.get.conf) + blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) workerId = Utils.localHostName() + ":" + UUID.randomUUID() |