aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-18 19:26:38 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-18 19:26:38 -0700
commit1aeae05bb20f01ab7ccaa62fe905a63e020074b5 (patch)
tree4187423e85b779f0e166e0af99bbc6d09003b0d8 /streaming
parentb4b35f133aecaf84f04e8e444b660a33c6b7894a (diff)
downloadspark-1aeae05bb20f01ab7ccaa62fe905a63e020074b5.tar.gz
spark-1aeae05bb20f01ab7ccaa62fe905a63e020074b5.tar.bz2
spark-1aeae05bb20f01ab7ccaa62fe905a63e020074b5.zip
[SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of generate blocks fills up to capacity
Generated blocks are inserted into an ArrayBlockingQueue, and another thread pulls stuff from the ArrayBlockingQueue and pushes it into BlockManager. Now if that queue fills up to capacity (default is 10 blocks), then the inserting into queue (done in the function updateCurrentBuffer) get blocked inside a synchronized block. However, the thread that is pulling blocks from the queue uses the same lock to check the current (active or stopped) while pulling from the queue. Since the block generating threads is blocked (as the queue is full) on the lock, this thread that is supposed to drain the queue gets blocked. Ergo, deadlock. Solution: Moved blocking call to ArrayBlockingQueue outside the synchronized to prevent deadlock. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8257 from tdas/SPARK-10072.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala29
1 files changed, 19 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 300e820d01..421d60ae35 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -227,16 +227,21 @@ private[streaming] class BlockGenerator(
def isStopped(): Boolean = state == StoppedAll
/** Change the buffer to which single records are added to. */
- private def updateCurrentBuffer(time: Long): Unit = synchronized {
+ private def updateCurrentBuffer(time: Long): Unit = {
try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[Any]
- if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
- val newBlock = new Block(blockId, newBlockBuffer)
- listener.onGenerateBlock(blockId)
+ var newBlock: Block = null
+ synchronized {
+ if (currentBuffer.nonEmpty) {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[Any]
+ val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
+ listener.onGenerateBlock(blockId)
+ newBlock = new Block(blockId, newBlockBuffer)
+ }
+ }
+
+ if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
- logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
@@ -250,9 +255,13 @@ private[streaming] class BlockGenerator(
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
- def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData }
+ def areBlocksBeingGenerated: Boolean = synchronized {
+ state != StoppedGeneratingBlocks
+ }
+
try {
- while (isGeneratingBlocks) {
+ // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
+ while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>