diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-01-18 13:34:12 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-01-18 13:34:12 -0800 |
commit | b8cb548a4394221f2b029c84c7f130774da69e3a (patch) | |
tree | a9c08ccd7dc2fe0e9a060a5ea3d617012760b3d7 /streaming/src | |
parent | 302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82 (diff) | |
download | spark-b8cb548a4394221f2b029c84c7f130774da69e3a.tar.gz spark-b8cb548a4394221f2b029c84c7f130774da69e3a.tar.bz2 spark-b8cb548a4394221f2b029c84c7f130774da69e3a.zip |
[SPARK-10985][CORE] Avoid passing evicted blocks throughout BlockManager
This patch refactors portions of the BlockManager and CacheManager in order to avoid having to pass `evictedBlocks` lists throughout the code. It appears that these lists were only consumed by `TaskContext.taskMetrics`, so the new code now directly updates the metrics from the lower-level BlockManager methods.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #10776 from JoshRosen/SPARK-10985.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index faa5aca1d8..e22e320b17 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -71,7 +71,7 @@ private[streaming] class BlockManagerBasedBlockHandler( var numRecords: Option[Long] = None - val putResult: Seq[(BlockId, BlockStatus)] = block match { + val putSucceeded: Boolean = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, @@ -88,7 +88,7 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } @@ -184,9 +184,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - val putResult = + val putSucceeded = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } |