aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala40
1 files changed, 32 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 794dece370..300e820d01 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -155,10 +155,17 @@ private[streaming] class BlockGenerator(
/**
* Push a single data item into the buffer.
*/
- def addData(data: Any): Unit = synchronized {
+ def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
- currentBuffer += data
+ synchronized {
+ if (state == Active) {
+ currentBuffer += data
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
+ }
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
@@ -169,11 +176,18 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. After buffering the data, the
* `BlockGeneratorListener.onAddData` callback will be called.
*/
- def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
+ def addDataWithCallback(data: Any, metadata: Any): Unit = {
if (state == Active) {
waitToPush()
- currentBuffer += data
- listener.onAddData(data, metadata)
+ synchronized {
+ if (state == Active) {
+ currentBuffer += data
+ listener.onAddData(data, metadata)
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
+ }
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
@@ -185,13 +199,23 @@ private[streaming] class BlockGenerator(
* `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items
* are atomically added to the buffer, and are hence guaranteed to be present in a single block.
*/
- def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized {
+ def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = {
if (state == Active) {
+ // Unroll iterator into a temp buffer, and wait for pushing in the process
+ val tempBuffer = new ArrayBuffer[Any]
dataIterator.foreach { data =>
waitToPush()
- currentBuffer += data
+ tempBuffer += data
+ }
+ synchronized {
+ if (state == Active) {
+ currentBuffer ++= tempBuffer
+ listener.onAddData(tempBuffer, metadata)
+ } else {
+ throw new SparkException(
+ "Cannot add data as BlockGenerator has not been started or has been stopped")
+ }
}
- listener.onAddData(dataIterator, metadata)
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")