aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-06 14:35:30 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-06 14:35:30 -0700
commit0a078303d08ad2bb92b9a8a6969563d75b512290 (patch)
tree3dd33e8c34634c6797da37561e230faaadf2b395 /streaming/src/main/scala
parent21fdfd7d6f89adbd37066c169e6ba9ccd337683e (diff)
downloadspark-0a078303d08ad2bb92b9a8a6969563d75b512290.tar.gz
spark-0a078303d08ad2bb92b9a8a6969563d75b512290.tar.bz2
spark-0a078303d08ad2bb92b9a8a6969563d75b512290.zip
[SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates
In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied. In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular. - Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`) - Made `RateControllerSuite` faster (by increasing batch interval) and less flaky - Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places). - Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7913 from tdas/SPARK-9556 and squashes the following commits: 41d4461 [Tathagata Das] fix scala style eb9fd59 [Tathagata Das] Updated kinesis receiver d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556 31da173 [Tathagata Das] Fix bug 12116df [Tathagata Das] Add BlockGeneratorSuite 74bd069 [Tathagata Das] Fix style 989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it 3ff618c [Tathagata Das] Fix test b40eff8 [Tathagata Das] slight refactoring f0df0f1 [Tathagata Das] Scala style fixes 51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala131
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala52
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala27
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala33
6 files changed, 176 insertions, 78 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index cd309788a7..7ec74016a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -144,7 +144,7 @@ private[streaming] class ActorReceiver[T: ClassTag](
receiverSupervisorStrategy: SupervisorStrategy
) extends Receiver[T](storageLevel) with Logging {
- protected lazy val supervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
+ protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
class Supervisor extends Actor {
@@ -191,11 +191,11 @@ private[streaming] class ActorReceiver[T: ClassTag](
}
def onStart(): Unit = {
- supervisor
- logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+ actorSupervisor
+ logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
}
def onStop(): Unit = {
- supervisor ! PoisonPill
+ actorSupervisor ! PoisonPill
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 92b51ce392..794dece370 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -21,10 +21,10 @@ import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{Clock, SystemClock}
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
@@ -69,16 +69,35 @@ private[streaming] trait BlockGeneratorListener {
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
+ *
+ * Note: Do not create BlockGenerator instances directly inside receivers. Use
+ * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
- conf: SparkConf
+ conf: SparkConf,
+ clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
- private val clock = new SystemClock()
+ /**
+ * The BlockGenerator can be in 5 possible states, in the order as follows.
+ * - Initialized: Nothing has been started
+ * - Active: start() has been called, and it is generating blocks on added data.
+ * - StoppedAddingData: stop() has been called, the adding of data has been stopped,
+ * but blocks are still being generated and pushed.
+ * - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
+ * they are still being pushed.
+ * - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
+ */
+ private object GeneratorState extends Enumeration {
+ type GeneratorState = Value
+ val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
+ }
+ import GeneratorState._
+
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
@@ -89,59 +108,100 @@ private[streaming] class BlockGenerator(
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@volatile private var currentBuffer = new ArrayBuffer[Any]
- @volatile private var stopped = false
+ @volatile private var state = Initialized
/** Start block generating and pushing threads. */
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Started BlockGenerator")
+ def start(): Unit = synchronized {
+ if (state == Initialized) {
+ state = Active
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Started BlockGenerator")
+ } else {
+ throw new SparkException(
+ s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
+ }
}
- /** Stop all threads. */
- def stop() {
+ /**
+ * Stop everything in the right order such that all the data added is pushed out correctly.
+ * - First, stop adding data to the current buffer.
+ * - Second, stop generating blocks.
+ * - Finally, wait for queue of to-be-pushed blocks to be drained.
+ */
+ def stop(): Unit = {
+ // Set the state to stop adding data
+ synchronized {
+ if (state == Active) {
+ state = StoppedAddingData
+ } else {
+ logWarning(s"Cannot stop BlockGenerator as its not in the Active state [state = $state]")
+ return
+ }
+ }
+
+ // Stop generating blocks and set the state for block pushing thread to start draining the queue
logInfo("Stopping BlockGenerator")
blockIntervalTimer.stop(interruptTimer = false)
- stopped = true
- logInfo("Waiting for block pushing thread")
+ synchronized { state = StoppedGeneratingBlocks }
+
+ // Wait for the queue to drain and mark generated as stopped
+ logInfo("Waiting for block pushing thread to terminate")
blockPushingThread.join()
+ synchronized { state = StoppedAll }
logInfo("Stopped BlockGenerator")
}
/**
- * Push a single data item into the buffer. All received data items
- * will be periodically pushed into BlockManager.
+ * Push a single data item into the buffer.
*/
- def addData (data: Any): Unit = synchronized {
- waitToPush()
- currentBuffer += data
+ def addData(data: Any): Unit = synchronized {
+ if (state == Active) {
+ waitToPush()
+ currentBuffer += data
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
}
/**
* Push a single data item into the buffer. After buffering the data, the
- * `BlockGeneratorListener.onAddData` callback will be called. All received data items
- * will be periodically pushed into BlockManager.
+ * `BlockGeneratorListener.onAddData` callback will be called.
*/
def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
- waitToPush()
- currentBuffer += data
- listener.onAddData(data, metadata)
+ if (state == Active) {
+ waitToPush()
+ currentBuffer += data
+ listener.onAddData(data, metadata)
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
}
/**
* Push multiple data items into the buffer. After buffering the data, the
- * `BlockGeneratorListener.onAddData` callback will be called. All received data items
- * will be periodically pushed into BlockManager. Note that all the data items is guaranteed
- * to be present in a single block.
+ * `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items
+ * are atomically added to the buffer, and are hence guaranteed to be present in a single block.
*/
def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
- dataIterator.foreach { data =>
- waitToPush()
- currentBuffer += data
+ if (state == Active) {
+ dataIterator.foreach { data =>
+ waitToPush()
+ currentBuffer += data
+ }
+ listener.onAddData(dataIterator, metadata)
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
}
- listener.onAddData(dataIterator, metadata)
}
+ def isActive(): Boolean = state == Active
+
+ def isStopped(): Boolean = state == StoppedAll
+
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
@@ -165,18 +225,21 @@ private[streaming] class BlockGenerator(
/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
+
+ def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData }
try {
- while (!stopped) {
- Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
+ while (isGeneratingBlocks) {
+ Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
- // Push out the blocks that are still left
+
+ // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
- logDebug("Getting block ")
val block = blocksForPushing.take()
+ logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index f663def4c0..bca1fbc8fd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -45,8 +45,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
/**
* Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}.
*/
- def getCurrentLimit: Long =
- rateLimiter.getRate.toLong
+ def getCurrentLimit: Long = rateLimiter.getRate.toLong
/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 7504fa44d9..554aae0117 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -116,12 +116,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* being pushed into Spark's memory.
*/
def store(dataItem: T) {
- executor.pushSingle(dataItem)
+ supervisor.pushSingle(dataItem)
}
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def store(dataBuffer: ArrayBuffer[T]) {
- executor.pushArrayBuffer(dataBuffer, None, None)
+ supervisor.pushArrayBuffer(dataBuffer, None, None)
}
/**
@@ -130,12 +130,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
- executor.pushArrayBuffer(dataBuffer, Some(metadata), None)
+ supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None)
}
/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: Iterator[T]) {
- executor.pushIterator(dataIterator, None, None)
+ supervisor.pushIterator(dataIterator, None, None)
}
/**
@@ -144,12 +144,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: java.util.Iterator[T], metadata: Any) {
- executor.pushIterator(dataIterator, Some(metadata), None)
+ supervisor.pushIterator(dataIterator, Some(metadata), None)
}
/** Store an iterator of received data as a data block into Spark's memory. */
def store(dataIterator: java.util.Iterator[T]) {
- executor.pushIterator(dataIterator, None, None)
+ supervisor.pushIterator(dataIterator, None, None)
}
/**
@@ -158,7 +158,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(dataIterator: Iterator[T], metadata: Any) {
- executor.pushIterator(dataIterator, Some(metadata), None)
+ supervisor.pushIterator(dataIterator, Some(metadata), None)
}
/**
@@ -167,7 +167,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* that Spark is configured to use.
*/
def store(bytes: ByteBuffer) {
- executor.pushBytes(bytes, None, None)
+ supervisor.pushBytes(bytes, None, None)
}
/**
@@ -176,12 +176,12 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* for being used in the corresponding InputDStream.
*/
def store(bytes: ByteBuffer, metadata: Any) {
- executor.pushBytes(bytes, Some(metadata), None)
+ supervisor.pushBytes(bytes, Some(metadata), None)
}
/** Report exceptions in receiving data. */
def reportError(message: String, throwable: Throwable) {
- executor.reportError(message, throwable)
+ supervisor.reportError(message, throwable)
}
/**
@@ -193,7 +193,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* The `message` will be reported to the driver.
*/
def restart(message: String) {
- executor.restartReceiver(message)
+ supervisor.restartReceiver(message)
}
/**
@@ -205,7 +205,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* The `message` and `exception` will be reported to the driver.
*/
def restart(message: String, error: Throwable) {
- executor.restartReceiver(message, Some(error))
+ supervisor.restartReceiver(message, Some(error))
}
/**
@@ -215,22 +215,22 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* in a background thread.
*/
def restart(message: String, error: Throwable, millisecond: Int) {
- executor.restartReceiver(message, Some(error), millisecond)
+ supervisor.restartReceiver(message, Some(error), millisecond)
}
/** Stop the receiver completely. */
def stop(message: String) {
- executor.stop(message, None)
+ supervisor.stop(message, None)
}
/** Stop the receiver completely due to an exception */
def stop(message: String, error: Throwable) {
- executor.stop(message, Some(error))
+ supervisor.stop(message, Some(error))
}
/** Check if the receiver has started or not. */
def isStarted(): Boolean = {
- executor.isReceiverStarted()
+ supervisor.isReceiverStarted()
}
/**
@@ -238,7 +238,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* the receiving of data should be stopped.
*/
def isStopped(): Boolean = {
- executor.isReceiverStopped()
+ supervisor.isReceiverStopped()
}
/**
@@ -257,7 +257,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
private var id: Int = -1
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
- private[streaming] var executor_ : ReceiverSupervisor = null
+ @transient private var _supervisor : ReceiverSupervisor = null
/** Set the ID of the DStream that this receiver is associated with. */
private[streaming] def setReceiverId(id_ : Int) {
@@ -265,15 +265,17 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}
/** Attach Network Receiver executor to this receiver. */
- private[streaming] def attachExecutor(exec: ReceiverSupervisor) {
- assert(executor_ == null)
- executor_ = exec
+ private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
+ assert(_supervisor == null)
+ _supervisor = exec
}
- /** Get the attached executor. */
- private def executor: ReceiverSupervisor = {
- assert(executor_ != null, "Executor has not been attached to this receiver")
- executor_
+ /** Get the attached supervisor. */
+ private[streaming] def supervisor: ReceiverSupervisor = {
+ assert(_supervisor != null,
+ "A ReceiverSupervisor have not been attached to the receiver yet. Maybe you are starting " +
+ "some computation in the receiver before the Receiver.onStart() has been called.")
+ _supervisor
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index e98017a637..158d1ba2f1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -44,8 +44,8 @@ private[streaming] abstract class ReceiverSupervisor(
}
import ReceiverState._
- // Attach the executor to the receiver
- receiver.attachExecutor(this)
+ // Attach the supervisor to the receiver
+ receiver.attachSupervisor(this)
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))
@@ -60,7 +60,7 @@ private[streaming] abstract class ReceiverSupervisor(
private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
/** The current maximum rate limit for this receiver. */
- private[streaming] def getCurrentRateLimit: Option[Long] = None
+ private[streaming] def getCurrentRateLimit: Long = Long.MaxValue
/** Exception associated with the stopping of the receiver */
@volatile protected var stoppingError: Throwable = null
@@ -92,13 +92,30 @@ private[streaming] abstract class ReceiverSupervisor(
optionalBlockId: Option[StreamBlockId]
)
+ /**
+ * Create a custom [[BlockGenerator]] that the receiver implementation can directly control
+ * using their provided [[BlockGeneratorListener]].
+ *
+ * Note: Do not explicitly start or stop the `BlockGenerator`, the `ReceiverSupervisorImpl`
+ * will take care of it.
+ */
+ def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator
+
/** Report errors. */
def reportError(message: String, throwable: Throwable)
- /** Called when supervisor is started */
+ /**
+ * Called when supervisor is started.
+ * Note that this must be called before the receiver.onStart() is called to ensure
+ * things like [[BlockGenerator]]s are started before the receiver starts sending data.
+ */
protected def onStart() { }
- /** Called when supervisor is stopped */
+ /**
+ * Called when supervisor is stopped.
+ * Note that this must be called after the receiver.onStop() is called to ensure
+ * things like [[BlockGenerator]]s are cleaned up after the receiver stops sending data.
+ */
protected def onStop(message: String, error: Option[Throwable]) { }
/** Called when receiver is started. Return true if the driver accepts us */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 0d802f8354..59ef58d232 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
@@ -81,15 +82,20 @@ private[streaming] class ReceiverSupervisorImpl(
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
- blockGenerator.updateRate(eps)
+ registeredBlockGenerators.foreach { bg =>
+ bg.updateRate(eps)
+ }
}
})
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())
+ private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
+ with mutable.SynchronizedBuffer[BlockGenerator]
+
/** Divides received data records into data blocks for pushing in BlockManager. */
- private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+ private val defaultBlockGeneratorListener = new BlockGeneratorListener {
def onAddData(data: Any, metadata: Any): Unit = { }
def onGenerateBlock(blockId: StreamBlockId): Unit = { }
@@ -101,14 +107,15 @@ private[streaming] class ReceiverSupervisorImpl(
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
- }, streamId, env.conf)
+ }
+ private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
- override private[streaming] def getCurrentRateLimit: Option[Long] =
- Some(blockGenerator.getCurrentLimit)
+ /** Get the current rate limit of the default block generator */
+ override private[streaming] def getCurrentRateLimit: Long = defaultBlockGenerator.getCurrentLimit
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
- blockGenerator.addData(data)
+ defaultBlockGenerator.addData(data)
}
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
@@ -162,11 +169,11 @@ private[streaming] class ReceiverSupervisorImpl(
}
override protected def onStart() {
- blockGenerator.start()
+ registeredBlockGenerators.foreach { _.start() }
}
override protected def onStop(message: String, error: Option[Throwable]) {
- blockGenerator.stop()
+ registeredBlockGenerators.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}
@@ -183,6 +190,16 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Stopped receiver " + streamId)
}
+ override def createBlockGenerator(
+ blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
+ // Cleanup BlockGenerators that have already been stopped
+ registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
+
+ val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
+ registeredBlockGenerators += newBlockGenerator
+ newBlockGenerator
+ }
+
/** Generate new block ID */
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)