aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala29
1 files changed, 19 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 300e820d01..421d60ae35 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -227,16 +227,21 @@ private[streaming] class BlockGenerator(
def isStopped(): Boolean = state == StoppedAll
/** Change the buffer to which single records are added to. */
- private def updateCurrentBuffer(time: Long): Unit = synchronized {
+ private def updateCurrentBuffer(time: Long): Unit = {
try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[Any]
- if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
- val newBlock = new Block(blockId, newBlockBuffer)
- listener.onGenerateBlock(blockId)
+ var newBlock: Block = null
+ synchronized {
+ if (currentBuffer.nonEmpty) {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[Any]
+ val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
+ listener.onGenerateBlock(blockId)
+ newBlock = new Block(blockId, newBlockBuffer)
+ }
+ }
+
+ if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
- logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
@@ -250,9 +255,13 @@ private[streaming] class BlockGenerator(
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
- def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData }
+ def areBlocksBeingGenerated: Boolean = synchronized {
+ state != StoppedGeneratingBlocks
+ }
+
try {
- while (isGeneratingBlocks) {
+ // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
+ while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>