aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-19 16:15:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-04-19 16:15:06 -0700
commita685e65a4ca0b300b12103fccbda29cb08221f5d (patch)
treef89fd294ce5ca49a474609eb712d25fbfdcc31f2 /streaming/src
parentecd877e8335ff6bb06c96d3045ccade80676e714 (diff)
downloadspark-a685e65a4ca0b300b12103fccbda29cb08221f5d.tar.gz
spark-a685e65a4ca0b300b12103fccbda29cb08221f5d.tar.bz2
spark-a685e65a4ca0b300b12103fccbda29cb08221f5d.zip
Revert "[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors"
This reverts commit ed2de0299a5a54b566b91ae9f47b6626c484c1d3.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala49
2 files changed, 31 insertions, 40 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index a7d870500f..c4bc5cf3f6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
-import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -190,19 +189,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager
val storeInBlockManagerFuture = Future {
- try {
- val putSucceeded = blockManager.putBytes(
- blockId,
- serializedBlock,
- effectiveStorageLevel,
- tellMaster = true)
- if (!putSucceeded) {
- logWarning(
- s"Could not store $blockId to block manager with storage level $storageLevel")
- }
- } catch {
- case NonFatal(t) =>
- logError(s"Could not store $blockId to block manager with storage level $storageLevel", t)
+ val putSucceeded = blockManager.putBytes(
+ blockId,
+ serializedBlock,
+ effectiveStorageLevel,
+ tellMaster = true)
+ if (!putSucceeded) {
+ throw new SparkException(
+ s"Could not store $blockId to block manager with storage level $storageLevel")
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index ea87b0d59f..4be4882938 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -127,17 +127,7 @@ class ReceivedBlockHandlerSuite
test("BlockManagerBasedBlockHandler - handle errors in storing block") {
withBlockManagerBasedBlockHandler { handler =>
- // Handle error in iterator (e.g. divide-by-zero error)
- intercept[Exception] {
- val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
- handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
- }
-
- // Handler error in block manager storing (e.g. too big block)
- intercept[SparkException] {
- val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
- handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
- }
+ testErrorHandling(handler)
}
}
@@ -177,15 +167,7 @@ class ReceivedBlockHandlerSuite
test("WriteAheadLogBasedBlockHandler - handle errors in storing block") {
withWriteAheadLogBasedBlockHandler { handler =>
- // Handle error in iterator (e.g. divide-by-zero error)
- intercept[Exception] {
- val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
- handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
- }
-
- // Throws no errors when storing blocks that are too large to be cached
- val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
- handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+ testErrorHandling(handler)
}
}
@@ -222,26 +204,26 @@ class ReceivedBlockHandlerSuite
sparkConf.set("spark.storage.unrollFraction", "0.4")
// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
blockManager = createBlockManager(12000, sparkConf)
- // This block is way too large to possibly be cached in memory:
- def hugeBlock: IteratorBlock = IteratorBlock(List.fill(100)(new Array[Byte](1000)).iterator)
// there is not enough space to store this block in MEMORY,
// But BlockManager will be able to serialize this block to WAL
// and hence count returns correct value.
- testRecordcount(false, StorageLevel.MEMORY_ONLY, hugeBlock, blockManager, Some(100))
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+ IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
// there is not enough space to store this block in MEMORY,
// But BlockManager will be able to serialize this block to DISK
// and hence count returns correct value.
- testRecordcount(true, StorageLevel.MEMORY_AND_DISK, hugeBlock, blockManager, Some(100))
+ testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+ IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
// there is not enough space to store this block With MEMORY_ONLY StorageLevel.
// BlockManager will not be able to unroll this block
// and hence it will not tryToPut this block, resulting the SparkException
storageLevel = StorageLevel.MEMORY_ONLY
withBlockManagerBasedBlockHandler { handler =>
- intercept[SparkException] {
- storeSingleBlock(handler, hugeBlock)
+ val thrown = intercept[SparkException] {
+ storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
}
}
}
@@ -364,6 +346,21 @@ class ReceivedBlockHandlerSuite
storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
}
+ /** Test error handling when blocks that cannot be stored */
+ private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) {
+ // Handle error in iterator (e.g. divide-by-zero error)
+ intercept[Exception] {
+ val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+ receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
+ }
+
+ // Handler error in block manager storing (e.g. too big block)
+ intercept[SparkException] {
+ val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
+ receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+ }
+ }
+
/** Instantiate a BlockManagerBasedBlockHandler and run a code with it */
private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) {
body(new BlockManagerBasedBlockHandler(blockManager, storageLevel))