aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-18 19:36:40 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-04-18 19:36:40 -0700
commited2de0299a5a54b566b91ae9f47b6626c484c1d3 (patch)
tree36858c6e740c9867259856cf8a90c6d381f4b88f /streaming/src/main/scala/org
parent5e92583d38e11d39deb429a39725443111205a4a (diff)
downloadspark-ed2de0299a5a54b566b91ae9f47b6626c484c1d3.tar.gz
spark-ed2de0299a5a54b566b91ae9f47b6626c484c1d3.tar.bz2
spark-ed2de0299a5a54b566b91ae9f47b6626c484c1d3.zip
[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put errors
WriteAheadLogBasedBlockHandler will currently throw exceptions if its BlockManager `put()` calls fail, even though those calls are only performed as a performance optimization. Instead, it should log and ignore exceptions during that `put()`. This is a longstanding issue that was masked by an incorrect test case. I think that we haven't noticed this in production because 1. most people probably use a `MEMORY_AND_DISK` storage level, and 2. typically, individual blocks may be small enough relative to the total storage memory such that they're able to evict blocks from previous batches, so `put()` failures here may be rare in practice. This patch fixes the faulty test and fixes the bug. /cc tdas Author: Josh Rosen <joshrosen@databricks.com> Closes #12484 from JoshRosen/received-block-hadndler-fix.
Diffstat (limited to 'streaming/src/main/scala/org')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 7aea1c9b64..f381fa4094 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
+import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -189,14 +190,19 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager
val storeInBlockManagerFuture = Future {
- val putSucceeded = blockManager.putBytes(
- blockId,
- serializedBlock,
- effectiveStorageLevel,
- tellMaster = true)
- if (!putSucceeded) {
- throw new SparkException(
- s"Could not store $blockId to block manager with storage level $storageLevel")
+ try {
+ val putSucceeded = blockManager.putBytes(
+ blockId,
+ serializedBlock,
+ effectiveStorageLevel,
+ tellMaster = true)
+ if (!putSucceeded) {
+ logWarning(
+ s"Could not store $blockId to block manager with storage level $storageLevel")
+ }
+ } catch {
+ case NonFatal(t) =>
+ logError(s"Could not store $blockId to block manager with storage level $storageLevel", t)
}
}