From b8cb548a4394221f2b029c84c7f130774da69e3a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 18 Jan 2016 13:34:12 -0800 Subject: [SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager This patch refactors portions of the BlockManager and CacheManager in order to avoid having to pass `evictedBlocks` lists throughout the code. It appears that these lists were only consumed by `TaskContext.taskMetrics`, so the new code now directly updates the metrics from the lower-level BlockManager methods. Author: Josh Rosen Closes #10776 from JoshRosen/SPARK-10985. --- .../apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'streaming/src/main') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index faa5aca1d8..e22e320b17 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -71,7 +71,7 @@ private[streaming] class BlockManagerBasedBlockHandler( var numRecords: Option[Long] = None - val putResult: Seq[(BlockId, BlockStatus)] = block match { + val putSucceeded: Boolean = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, @@ -88,7 +88,7 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } @@ -184,9 +184,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - val putResult = + val putSucceeded = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } -- cgit v1.2.3