From ed2de0299a5a54b566b91ae9f47b6626c484c1d3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 18 Apr 2016 19:36:40 -0700 Subject: [SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors WriteAheadLogBasedBlockHandler will currently throw exceptions if its BlockManager `put()` calls fail, even though those calls are only performed as a performance optimization. Instead, it should log and ignore exceptions during that `put()`. This is a longstanding issue that was masked by an incorrect test case. I think that we haven't noticed this in production because 1. most people probably use a `MEMORY_AND_DISK` storage level, and 2. typically, individual blocks may be small enough relative to the total storage memory such that they're able to evict blocks from previous batches, so `put()` failures here may be rare in practice. This patch fixes the faulty test and fixes the bug. /cc tdas Author: Josh Rosen Closes #12484 from JoshRosen/received-block-hadndler-fix. --- .../streaming/receiver/ReceivedBlockHandler.scala | 22 ++++++---- .../streaming/ReceivedBlockHandlerSuite.scala | 49 ++++++++++++---------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 7aea1c9b64..f381fa4094 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.{existentials, postfixOps} +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -189,14 +190,19 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - val putSucceeded = blockManager.putBytes( - blockId, - serializedBlock, - effectiveStorageLevel, - tellMaster = true) - if (!putSucceeded) { - throw new SparkException( - s"Could not store $blockId to block manager with storage level $storageLevel") + try { + val putSucceeded = blockManager.putBytes( + blockId, + serializedBlock, + effectiveStorageLevel, + tellMaster = true) + if (!putSucceeded) { + logWarning( + s"Could not store $blockId to block manager with storage level $storageLevel") + } + } catch { + case NonFatal(t) => + logError(s"Could not store $blockId to block manager with storage level $storageLevel", t) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 4be4882938..ea87b0d59f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -127,7 +127,17 @@ class ReceivedBlockHandlerSuite test("BlockManagerBasedBlockHandler - handle errors in storing block") { withBlockManagerBasedBlockHandler { handler => - testErrorHandling(handler) + // Handle error in iterator (e.g. divide-by-zero error) + intercept[Exception] { + val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } + handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) + } + + // Handler error in block manager storing (e.g. too big block) + intercept[SparkException] { + val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) + handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) + } } } @@ -167,7 +177,15 @@ class ReceivedBlockHandlerSuite test("WriteAheadLogBasedBlockHandler - handle errors in storing block") { withWriteAheadLogBasedBlockHandler { handler => - testErrorHandling(handler) + // Handle error in iterator (e.g. divide-by-zero error) + intercept[Exception] { + val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } + handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) + } + + // Throws no errors when storing blocks that are too large to be cached + val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) + handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) } } @@ -204,26 +222,26 @@ class ReceivedBlockHandlerSuite sparkConf.set("spark.storage.unrollFraction", "0.4") // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll blockManager = createBlockManager(12000, sparkConf) + // This block is way too large to possibly be cached in memory: + def hugeBlock: IteratorBlock = IteratorBlock(List.fill(100)(new Array[Byte](1000)).iterator) // there is not enough space to store this block in MEMORY, // But BlockManager will be able to serialize this block to WAL // and hence count returns correct value. - testRecordcount(false, StorageLevel.MEMORY_ONLY, - IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) + testRecordcount(false, StorageLevel.MEMORY_ONLY, hugeBlock, blockManager, Some(100)) // there is not enough space to store this block in MEMORY, // But BlockManager will be able to serialize this block to DISK // and hence count returns correct value. - testRecordcount(true, StorageLevel.MEMORY_AND_DISK, - IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) + testRecordcount(true, StorageLevel.MEMORY_AND_DISK, hugeBlock, blockManager, Some(100)) // there is not enough space to store this block With MEMORY_ONLY StorageLevel. // BlockManager will not be able to unroll this block // and hence it will not tryToPut this block, resulting the SparkException storageLevel = StorageLevel.MEMORY_ONLY withBlockManagerBasedBlockHandler { handler => - val thrown = intercept[SparkException] { - storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator)) + intercept[SparkException] { + storeSingleBlock(handler, hugeBlock) } } } @@ -346,21 +364,6 @@ class ReceivedBlockHandlerSuite storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) }) } - /** Test error handling when blocks that cannot be stored */ - private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) { - // Handle error in iterator (e.g. divide-by-zero error) - intercept[Exception] { - val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } - receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) - } - - // Handler error in block manager storing (e.g. too big block) - intercept[SparkException] { - val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) - receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) - } - } - /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */ private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) { body(new BlockManagerBasedBlockHandler(blockManager, storageLevel)) -- cgit v1.2.3