diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2015-04-22 21:42:09 -0700 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2015-04-22 21:42:09 -0700 |
commit | 03e85b4a11899f37424cd6e1f8d71f1d704c90bb (patch) | |
tree | 03d95c46c99af18128559a152de01a8e60dad874 /core | |
parent | d20686066e978dd12e618e3978f109f05bc412fe (diff) | |
download | spark-03e85b4a11899f37424cd6e1f8d71f1d704c90bb.tar.gz spark-03e85b4a11899f37424cd6e1f8d71f1d704c90bb.tar.bz2 spark-03e85b4a11899f37424cd6e1f8d71f1d704c90bb.zip |
[SPARK-7046] Remove InputMetrics from BlockResult
This is a code cleanup.
The BlockResult class originally contained an InputMetrics object so that InputMetrics could
directly be used as the InputMetrics for the whole task. Now we copy the fields out of here, and
the presence of this object is confusing because it's only a partial input metrics (it doesn't
include the records read). Because this object is no longer useful (and is confusing), it should
be removed.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #5627 from kayousterhout/SPARK-7046 and squashes the following commits:
bf64bbe [Kay Ousterhout] Import fix
a08ca19 [Kay Ousterhout] [SPARK-7046] Remove InputMetrics from BlockResult
Diffstat (limited to 'core')
3 files changed, 11 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index a96d754744..4d20c73693 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(blockResult) => // Partition is already materialized, so just return its values - val inputMetrics = blockResult.inputMetrics val existingMetrics = context.taskMetrics - .getInputMetricsForReadMethod(inputMetrics.readMethod) - existingMetrics.incBytesRead(inputMetrics.bytesRead) + .getInputMetricsForReadMethod(blockResult.readMethod) + existingMetrics.incBytesRead(blockResult.bytes) val iter = blockResult.data.asInstanceOf[Iterator[T]] new InterruptibleIterator[T](context, iter) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 145a9c1ae3..55718e584c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import scala.util.Random import sun.nio.ch.DirectBuffer import org.apache.spark._ -import org.apache.spark.executor._ +import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} @@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( val data: Iterator[Any], - readMethod: DataReadMethod.Value, - bytes: Long) { - val inputMetrics = new InputMetrics(readMethod) - inputMetrics.incBytesRead(bytes) -} + val readMethod: DataReadMethod.Value, + val bytes: Long) /** * Manager running on every node (driver and executors) which provides interfaces for putting and diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 545722b050..7d82a7c66a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val list1Get = store.get("list1") assert(list1Get.isDefined, "list1 expected to be in store") assert(list1Get.get.data.size === 2) - assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate) - assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory) + assert(list1Get.get.bytes === list1SizeEstimate) + assert(list1Get.get.readMethod === DataReadMethod.Memory) val list2MemoryGet = store.get("list2memory") assert(list2MemoryGet.isDefined, "list2memory expected to be in store") assert(list2MemoryGet.get.data.size === 3) - assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate) - assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory) + assert(list2MemoryGet.get.bytes === list2SizeEstimate) + assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory) val list2DiskGet = store.get("list2disk") assert(list2DiskGet.isDefined, "list2memory expected to be in store") assert(list2DiskGet.get.data.size === 3) // We don't know the exact size of the data on disk, but it should certainly be > 0. - assert(list2DiskGet.get.inputMetrics.bytesRead > 0) - assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) + assert(list2DiskGet.get.bytes > 0) + assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } test("in-memory LRU storage") { |