diff options
Diffstat (limited to 'streaming/src')
3 files changed, 60 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 0316b6862f..55765dc906 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -27,9 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { - /** Called when a new block needs to be pushed */ + /** + * Called after a data item is added into the BlockGenerator. The data addition and this + * callback are synchronized with the block generation and its associated callback, + * so block generation waits for the active data addition+callback to complete. This is useful + * for updating metadata on successful buffering of a data item, specifically that metadata + * that will be useful when a block is generated. Any long blocking operation in this callback + * will hurt the throughput. + */ + def onAddData(data: Any, metadata: Any) + + /** + * Called when a new block of data is generated by the block generator. The block generation + * and this callback are synchronized with the data addition and its associated callback, so + * the data addition waits for the block generation+callback to complete. This is useful + * for updating metadata when a block has been generated, specifically metadata that will + * be useful when the block has been successfully stored. Any long blocking operation in this + * callback will hurt the throughput. + */ + def onGenerateBlock(blockId: StreamBlockId) + + /** + * Called when a new block is ready to be pushed. Callers are supposed to store the block into + * Spark in this method. Internally this is called from a single + * thread, that is not synchronized with any other callbacks. Hence it is okay to do long + * blocking operation in this callback. + */ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) - /** Called when an error has occurred in BlockGenerator */ + + /** + * Called when an error has occurred in the BlockGenerator. Can be called form many places + * so better to not do any long block operation in this callback. + */ def onError(message: String, throwable: Throwable) } @@ -80,9 +109,20 @@ private[streaming] class BlockGenerator( * Push a single data item into the buffer. All received data items * will be periodically pushed into BlockManager. */ - def += (data: Any): Unit = synchronized { + def addData (data: Any): Unit = synchronized { + waitToPush() + currentBuffer += data + } + + /** + * Push a single data item into the buffer. After buffering the data, the + * `BlockGeneratorListnere.onAddData` callback will be called. All received data items + * will be periodically pushed into BlockManager. + */ + def addDataWithCallback(data: Any, metadata: Any) = synchronized { waitToPush() currentBuffer += data + listener.onAddData(data, metadata) } /** Change the buffer to which single records are added to. */ @@ -93,14 +133,15 @@ private[streaming] class BlockGenerator( if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) + listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") - case t: Throwable => - reportError("Error in block updating thread", t) + case e: Exception => + reportError("Error in block updating thread", e) } } @@ -126,8 +167,8 @@ private[streaming] class BlockGenerator( } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") - case t: Throwable => - reportError("Error in block pushing thread", t) + case e: Exception => + reportError("Error in block pushing thread", e) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 5360412330..3b1233e86c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -27,10 +27,10 @@ import akka.actor.{Actor, Props} import akka.pattern.ask import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration + import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.WriteAheadLogFileSegment import org.apache.spark.util.{AkkaUtils, Utils} /** @@ -99,6 +99,10 @@ private[streaming] class ReceiverSupervisorImpl( /** Divides received data records into data blocks for pushing in BlockManager. */ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { + def onAddData(data: Any, metadata: Any): Unit = { } + + def onGenerateBlock(blockId: StreamBlockId): Unit = { } + def onError(message: String, throwable: Throwable) { reportError(message, throwable) } @@ -110,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { - blockGenerator += (data) + blockGenerator.addData(data) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 0f6a9489db..e26c0c6859 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator += count + blockGenerator.addData(count) generatedData += count count += 1 Thread.sleep(10) @@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator += count + blockGenerator.addData(count) generatedData += count count += 1 Thread.sleep(1) @@ -299,6 +299,10 @@ class ReceiverSuite extends FunSuite with Timeouts { val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] val errors = new ArrayBuffer[Throwable] + def onAddData(data: Any, metadata: Any) { } + + def onGenerateBlock(blockId: StreamBlockId) { } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) arrayBuffers += bufferOfInts |