aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-01-18 13:34:12 -0800
committerAndrew Or <andrew@databricks.com>2016-01-18 13:34:12 -0800
commitb8cb548a4394221f2b029c84c7f130774da69e3a (patch)
treea9c08ccd7dc2fe0e9a060a5ea3d617012760b3d7 /streaming/src/main
parent302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82 (diff)
downloadspark-b8cb548a4394221f2b029c84c7f130774da69e3a.tar.gz
spark-b8cb548a4394221f2b029c84c7f130774da69e3a.tar.bz2
spark-b8cb548a4394221f2b029c84c7f130774da69e3a.zip
[SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager
This patch refactors portions of the BlockManager and CacheManager in order to avoid having to pass `evictedBlocks` lists throughout the code. It appears that these lists were only consumed by `TaskContext.taskMetrics`, so the new code now directly updates the metrics from the lower-level BlockManager methods. Author: Josh Rosen <joshrosen@databricks.com> Closes #10776 from JoshRosen/SPARK-10985.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index faa5aca1d8..e22e320b17 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -71,7 +71,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
var numRecords: Option[Long] = None
- val putResult: Seq[(BlockId, BlockStatus)] = block match {
+ val putSucceeded: Boolean = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
@@ -88,7 +88,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
}
- if (!putResult.map { _._1 }.contains(blockId)) {
+ if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
@@ -184,9 +184,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager
val storeInBlockManagerFuture = Future {
- val putResult =
+ val putSucceeded =
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
- if (!putResult.map { _._1 }.contains(blockId)) {
+ if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}