diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-04-28 19:31:57 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-04-28 19:31:57 -0700 |
commit | 5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0 (patch) | |
tree | d122c478a8b9d763527e153b23fde98e3db23a90 /streaming/src | |
parent | d36e67350c516a96d58abd50a0d5d22b3b22f291 (diff) | |
download | spark-5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0.tar.gz spark-5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0.tar.bz2 spark-5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0.zip |
[SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records to BlockGenerator with single callback
This is to ensure that receivers that receive data in small batches (like Kinesis) and want to add them but want the callback function to be called only once. This is for internal use only for improvement to Kinesis Receiver that we are planning to do.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #5695 from tdas/SPARK-7138 and squashes the following commits:
a35cf7d [Tathagata Das] Fixed style.
a7a4cb9 [Tathagata Das] Added extra method to BlockGenerator.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index f4963a78e1..4bebcc5aa7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -126,6 +126,20 @@ private[streaming] class BlockGenerator( listener.onAddData(data, metadata) } + /** + * Push multiple data items into the buffer. After buffering the data, the + * `BlockGeneratorListener.onAddData` callback will be called. All received data items + * will be periodically pushed into BlockManager. Note that all the data items is guaranteed + * to be present in a single block. + */ + def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized { + dataIterator.foreach { data => + waitToPush() + currentBuffer += data + } + listener.onAddData(dataIterator, metadata) + } + /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { |