aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-04-28 19:31:57 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-28 19:31:57 -0700
commit5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0 (patch)
treed122c478a8b9d763527e153b23fde98e3db23a90 /streaming
parentd36e67350c516a96d58abd50a0d5d22b3b22f291 (diff)
downloadspark-5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0.tar.gz
spark-5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0.tar.bz2
spark-5c8f4bd5fae539ab5fb992573d5357ed34e2f4d0.zip
[SPARK-7138] [STREAMING] Add method to BlockGenerator to add multiple records to BlockGenerator with single callback
This is to ensure that receivers that receive data in small batches (like Kinesis) and want to add them but want the callback function to be called only once. This is for internal use only for improvement to Kinesis Receiver that we are planning to do. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5695 from tdas/SPARK-7138 and squashes the following commits: a35cf7d [Tathagata Das] Fixed style. a7a4cb9 [Tathagata Das] Added extra method to BlockGenerator.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index f4963a78e1..4bebcc5aa7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -126,6 +126,20 @@ private[streaming] class BlockGenerator(
listener.onAddData(data, metadata)
}
+ /**
+ * Push multiple data items into the buffer. After buffering the data, the
+ * `BlockGeneratorListener.onAddData` callback will be called. All received data items
+ * will be periodically pushed into BlockManager. Note that all the data items is guaranteed
+ * to be present in a single block.
+ */
+ def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
+ dataIterator.foreach { data =>
+ waitToPush()
+ currentBuffer += data
+ }
+ listener.onAddData(dataIterator, metadata)
+ }
+
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {