aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-04-22 21:42:09 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2015-04-22 21:42:09 -0700
commit03e85b4a11899f37424cd6e1f8d71f1d704c90bb (patch)
tree03d95c46c99af18128559a152de01a8e60dad874 /core
parentd20686066e978dd12e618e3978f109f05bc412fe (diff)
downloadspark-03e85b4a11899f37424cd6e1f8d71f1d704c90bb.tar.gz
spark-03e85b4a11899f37424cd6e1f8d71f1d704c90bb.tar.bz2
spark-03e85b4a11899f37424cd6e1f8d71f1d704c90bb.zip
[SPARK-7046] Remove InputMetrics from BlockResult
This is a code cleanup. The BlockResult class originally contained an InputMetrics object so that InputMetrics could directly be used as the InputMetrics for the whole task. Now we copy the fields out of here, and the presence of this object is confusing because it's only a partial input metrics (it doesn't include the records read). Because this object is no longer useful (and is confusing), it should be removed. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #5627 from kayousterhout/SPARK-7046 and squashes the following commits: bf64bbe [Kay Ousterhout] Import fix a08ca19 [Kay Ousterhout] [SPARK-7046] Remove InputMetrics from BlockResult
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala12
3 files changed, 11 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index a96d754744..4d20c73693 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
- val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
- .getInputMetricsForReadMethod(inputMetrics.readMethod)
- existingMetrics.incBytesRead(inputMetrics.bytesRead)
+ .getInputMetricsForReadMethod(blockResult.readMethod)
+ existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 145a9c1ae3..55718e584c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import scala.util.Random
import sun.nio.ch.DirectBuffer
import org.apache.spark._
-import org.apache.spark.executor._
+import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
- readMethod: DataReadMethod.Value,
- bytes: Long) {
- val inputMetrics = new InputMetrics(readMethod)
- inputMetrics.incBytesRead(bytes)
-}
+ val readMethod: DataReadMethod.Value,
+ val bytes: Long)
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 545722b050..7d82a7c66a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
- assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
- assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ assert(list1Get.get.bytes === list1SizeEstimate)
+ assert(list1Get.get.readMethod === DataReadMethod.Memory)
val list2MemoryGet = store.get("list2memory")
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
assert(list2MemoryGet.get.data.size === 3)
- assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
- assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ assert(list2MemoryGet.get.bytes === list2SizeEstimate)
+ assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
val list2DiskGet = store.get("list2disk")
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
assert(list2DiskGet.get.data.size === 3)
// We don't know the exact size of the data on disk, but it should certainly be > 0.
- assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
- assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+ assert(list2DiskGet.get.bytes > 0)
+ assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}
test("in-memory LRU storage") {