From a685e65a4ca0b300b12103fccbda29cb08221f5d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 19 Apr 2016 16:15:06 -0700 Subject: Revert "[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors" This reverts commit ed2de0299a5a54b566b91ae9f47b6626c484c1d3. --- .../streaming/receiver/ReceivedBlockHandler.scala | 22 ++++------ .../streaming/ReceivedBlockHandlerSuite.scala | 49 ++++++++++------------ 2 files changed, 31 insertions(+), 40 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index a7d870500f..c4bc5cf3f6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.receiver import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.{existentials, postfixOps} -import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -190,19 +189,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - try { - val putSucceeded = blockManager.putBytes( - blockId, - serializedBlock, - effectiveStorageLevel, - tellMaster = true) - if (!putSucceeded) { - logWarning( - s"Could not store $blockId to block manager with storage level $storageLevel") - } - } catch { - case NonFatal(t) => - logError(s"Could not store $blockId to block manager with storage level $storageLevel", t) + val putSucceeded = blockManager.putBytes( + blockId, + serializedBlock, + effectiveStorageLevel, + tellMaster = true) + if (!putSucceeded) { + throw new SparkException( + s"Could not store $blockId to block manager with storage level $storageLevel") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index ea87b0d59f..4be4882938 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -127,17 +127,7 @@ class ReceivedBlockHandlerSuite test("BlockManagerBasedBlockHandler - handle errors in storing block") { withBlockManagerBasedBlockHandler { handler => - // Handle error in iterator (e.g. divide-by-zero error) - intercept[Exception] { - val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } - handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) - } - - // Handler error in block manager storing (e.g. too big block) - intercept[SparkException] { - val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) - handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) - } + testErrorHandling(handler) } } @@ -177,15 +167,7 @@ class ReceivedBlockHandlerSuite test("WriteAheadLogBasedBlockHandler - handle errors in storing block") { withWriteAheadLogBasedBlockHandler { handler => - // Handle error in iterator (e.g. divide-by-zero error) - intercept[Exception] { - val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } - handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) - } - - // Throws no errors when storing blocks that are too large to be cached - val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) - handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) + testErrorHandling(handler) } } @@ -222,26 +204,26 @@ class ReceivedBlockHandlerSuite sparkConf.set("spark.storage.unrollFraction", "0.4") // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll blockManager = createBlockManager(12000, sparkConf) - // This block is way too large to possibly be cached in memory: - def hugeBlock: IteratorBlock = IteratorBlock(List.fill(100)(new Array[Byte](1000)).iterator) // there is not enough space to store this block in MEMORY, // But BlockManager will be able to serialize this block to WAL // and hence count returns correct value. - testRecordcount(false, StorageLevel.MEMORY_ONLY, hugeBlock, blockManager, Some(100)) + testRecordcount(false, StorageLevel.MEMORY_ONLY, + IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) // there is not enough space to store this block in MEMORY, // But BlockManager will be able to serialize this block to DISK // and hence count returns correct value. - testRecordcount(true, StorageLevel.MEMORY_AND_DISK, hugeBlock, blockManager, Some(100)) + testRecordcount(true, StorageLevel.MEMORY_AND_DISK, + IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) // there is not enough space to store this block With MEMORY_ONLY StorageLevel. // BlockManager will not be able to unroll this block // and hence it will not tryToPut this block, resulting the SparkException storageLevel = StorageLevel.MEMORY_ONLY withBlockManagerBasedBlockHandler { handler => - intercept[SparkException] { - storeSingleBlock(handler, hugeBlock) + val thrown = intercept[SparkException] { + storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator)) } } } @@ -364,6 +346,21 @@ class ReceivedBlockHandlerSuite storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) }) } + /** Test error handling when blocks that cannot be stored */ + private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) { + // Handle error in iterator (e.g. divide-by-zero error) + intercept[Exception] { + val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } + receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) + } + + // Handler error in block manager storing (e.g. too big block) + intercept[SparkException] { + val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) + receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) + } + } + /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */ private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) { body(new BlockManagerBasedBlockHandler(blockManager, storageLevel)) -- cgit v1.2.3