diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-18 19:26:38 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-18 19:26:38 -0700 |
commit | 1aeae05bb20f01ab7ccaa62fe905a63e020074b5 (patch) | |
tree | 4187423e85b779f0e166e0af99bbc6d09003b0d8 | |
parent | b4b35f133aecaf84f04e8e444b660a33c6b7894a (diff) | |
download | spark-1aeae05bb20f01ab7ccaa62fe905a63e020074b5.tar.gz spark-1aeae05bb20f01ab7ccaa62fe905a63e020074b5.tar.bz2 spark-1aeae05bb20f01ab7ccaa62fe905a63e020074b5.zip |
[SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of generate blocks fills up to capacity
Generated blocks are inserted into an ArrayBlockingQueue, and another thread pulls stuff from the ArrayBlockingQueue and pushes it into BlockManager. Now if that queue fills up to capacity (default is 10 blocks), then the inserting into queue (done in the function updateCurrentBuffer) get blocked inside a synchronized block. However, the thread that is pulling blocks from the queue uses the same lock to check the current (active or stopped) while pulling from the queue. Since the block generating threads is blocked (as the queue is full) on the lock, this thread that is supposed to drain the queue gets blocked. Ergo, deadlock.
Solution: Moved blocking call to ArrayBlockingQueue outside the synchronized to prevent deadlock.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #8257 from tdas/SPARK-10072.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 300e820d01..421d60ae35 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -227,16 +227,21 @@ private[streaming] class BlockGenerator( def isStopped(): Boolean = state == StoppedAll /** Change the buffer to which single records are added to. */ - private def updateCurrentBuffer(time: Long): Unit = synchronized { + private def updateCurrentBuffer(time: Long): Unit = { try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[Any] - if (newBlockBuffer.size > 0) { - val blockId = StreamBlockId(receiverId, time - blockIntervalMs) - val newBlock = new Block(blockId, newBlockBuffer) - listener.onGenerateBlock(blockId) + var newBlock: Block = null + synchronized { + if (currentBuffer.nonEmpty) { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[Any] + val blockId = StreamBlockId(receiverId, time - blockIntervalMs) + listener.onGenerateBlock(blockId) + newBlock = new Block(blockId, newBlockBuffer) + } + } + + if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full - logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => @@ -250,9 +255,13 @@ private[streaming] class BlockGenerator( private def keepPushingBlocks() { logInfo("Started block pushing thread") - def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData } + def areBlocksBeingGenerated: Boolean = synchronized { + state != StoppedGeneratingBlocks + } + try { - while (isGeneratingBlocks) { + // While blocks are being generated, keep polling for to-be-pushed blocks and push them. + while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => |