aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-06-29 22:01:42 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2014-06-29 22:01:42 -0700
commit7b71a0e09622e09285a9884ebb67b5fb1c5caa53 (patch)
tree8a6480848bd9d3ed6cbf241d2d07df775eb4dd66 /core/src/test
parentcdf613fc52d7057555a2dbf2241ce90bacbabb4a (diff)
downloadspark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.tar.gz
spark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.tar.bz2
spark-7b71a0e09622e09285a9884ebb67b5fb1c5caa53.zip
[SPARK-1683] Track task read metrics.
This commit adds a new metric in TaskMetrics to record the input data size and displays this information in the UI. An earlier version of this commit also added the read time, which can be useful for diagnosing straggler problems, but unfortunately that change introduced a significant performance regression for jobs that don't do much computation. In order to track read time, we'll need to do sampling. The screenshots below show the UI with the new "Input" field, which I added to the stage summary page, the executor summary page, and the per-stage page. ![image](https://cloud.githubusercontent.com/assets/1108612/3167930/2627f92a-eb77-11e3-861c-98ea5bb7a1a2.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167936/475a889c-eb77-11e3-9706-f11c48751f17.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167948/80ebcf12-eb77-11e3-87ed-349fce6a770c.png) Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #962 from kayousterhout/read_metrics and squashes the following commits: f13b67d [Kay Ousterhout] Correctly format input bytes on executor page 8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test 8461492 [Kay Ousterhout] Miniscule style fix ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections 719f19d [Kay Ousterhout] Style fixes bb6ec62 [Kay Ousterhout] Small fixes 869ac7b [Kay Ousterhout] Updated Json tests 44a0301 [Kay Ousterhout] Fixed accidentally added line 4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop. f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase bf41029 [Kay Ousterhout] Updated Json tests to pass 0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test 4e52925 [Kay Ousterhout] Added Json output and associated tests. 365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala84
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala102
4 files changed, 153 insertions, 38 deletions
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4f178db40f..7f5d0b061e 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
test("get cached rdd") {
expecting {
- blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+ val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
whenExecuting(blockManager) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6df0a08096..71f48e295e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
+ taskMetrics.inputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d7dbe5164b..23cb6905bf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.language.postfixOps
@@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
}
+ test("correct BlockResult returned from get() calls") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
+ mapOutputTracker)
+ val list1 = List(new Array[Byte](200), new Array[Byte](200))
+ val list1ForSizeEstimate = new ArrayBuffer[Any]
+ list1ForSizeEstimate ++= list1.iterator
+ val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
+ val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
+ val list2ForSizeEstimate = new ArrayBuffer[Any]
+ list2ForSizeEstimate ++= list2.iterator
+ val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val list1Get = store.get("list1")
+ assert(list1Get.isDefined, "list1 expected to be in store")
+ assert(list1Get.get.data.size === 2)
+ assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
+ assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2MemoryGet = store.get("list2memory")
+ assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
+ assert(list2MemoryGet.get.data.size === 3)
+ assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
+ assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2DiskGet = store.get("list2disk")
+ assert(list2DiskGet.isDefined, "list2memory expected to be in store")
+ assert(list2DiskGet.get.data.size === 3)
+ System.out.println(list2DiskGet)
+ // We don't know the exact size of the data on disk, but it should certainly be > 0.
+ assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
+ assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+ }
+
test("in-memory LRU storage") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
@@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3").isDefined, "list3 was not in store")
- assert(store.get("list3").get.size == 2)
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1").isDefined, "list1 was not in store")
- assert(store.get("list1").get.size == 2)
+ assert(store.get("list1").get.data.size === 2)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3") === None, "list1 was in store")
}
@@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val listForSizeEstimate = new ArrayBuffer[Any]
+ listForSizeEstimate ++= list1.iterator
+ val listSize = SizeEstimator.estimate(listForSizeEstimate)
// At this point LRU should not kick in because list3 is only on disk
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list4").isDefined, "list4 was not in store")
- assert(store.get("list4").get.size === 2)
+ assert(store.get("list4").get.data.size === 2)
}
test("negative byte values in ByteBufferInputStream") {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6c49870455..316e14100e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
- makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+ val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
testEvent(taskStart, taskStartJsonString)
testEvent(taskGettingResult, taskGettingResultJsonString)
testEvent(taskEnd, taskEndJsonString)
+ testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
testEvent(jobStart, jobStartJsonString)
testEvent(jobEnd, jobEndJsonString)
testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
- testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
+ testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
// StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
- test("Backward compatibility") {
+ test("StageInfo.details backward compatibility") {
// StageInfo.details was added after 1.0.0.
val info = makeStageInfo(1, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
@@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite {
assert("" === newInfo.details)
}
+ test("InputMetrics backward compatibility") {
+ // InputMetrics were added after 1.0.1.
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+ assert(metrics.inputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.inputMetrics.isEmpty)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite {
metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
assertOptionEquals(
metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
+ assertOptionEquals(
+ metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
}
@@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite {
assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
}
+ private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
+ assert(metrics1.readMethod === metrics2.readMethod)
+ assert(metrics1.bytesRead === metrics2.bytesRead)
+ }
+
private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
assert(bm1.executorId === bm2.executorId)
assert(bm1.host === bm2.host)
@@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(w1, w2)
}
+ private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
+ assertEquals(i1, i2)
+ }
+
private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
assertEquals(t1, t2)
}
@@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite {
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
- private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
+ /**
+ * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
+ * set to true) or read data from a shuffle otherwise.
+ */
+ private def makeTaskMetrics(
+ a: Long,
+ b: Long,
+ c: Long,
+ d: Long,
+ e: Int,
+ f: Int,
+ hasHadoopInput: Boolean) = {
val t = new TaskMetrics
- val sr = new ShuffleReadMetrics
val sw = new ShuffleWriteMetrics
t.hostname = "localhost"
t.executorDeserializeTime = a
@@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite {
t.jvmGCTime = d
t.resultSerializationTime = a + b
t.memoryBytesSpilled = a + c
- sr.shuffleFinishTime = b + c
- sr.totalBlocksFetched = e + f
- sr.remoteBytesRead = b + d
- sr.localBlocksFetched = e
- sr.fetchWaitTime = a + d
- sr.remoteBlocksFetched = f
+
+ if (hasHadoopInput) {
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ inputMetrics.bytesRead = d + e + f
+ t.inputMetrics = Some(inputMetrics)
+ } else {
+ val sr = new ShuffleReadMetrics
+ sr.shuffleFinishTime = b + c
+ sr.totalBlocksFetched = e + f
+ sr.remoteBytesRead = b + d
+ sr.localBlocksFetched = e
+ sr.fetchWaitTime = a + d
+ sr.remoteBlocksFetched = f
+ t.shuffleReadMetrics = Some(sr)
+ }
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
- t.shuffleReadMetrics = Some(sr)
t.shuffleWriteMetrics = Some(sw)
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
@@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite {
| },
| "Shuffle Write Metrics":{
| "Shuffle Bytes Written":1200,
- | "Shuffle Write Time":1500},
- | "Updated Blocks":[
+ | "Shuffle Write Time":1500
+ | },
+ | "Updated Blocks":[
| {"Block ID":"rdd_0_0",
| "Status":{
| "Storage Level":{
@@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite {
|}
""".stripMargin
+ private val taskEndWithHadoopInputJsonString =
+ """
+ |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+ |"Task End Reason":{"Reason":"Success"},
+ |"Task Info":{
+ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ |},
+ |"Task Metrics":{
+ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+ | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+ | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+ | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
+ | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
+ | "Updated Blocks":[
+ | {"Block ID":"rdd_0_0",
+ | "Status":{
+ | "Storage Level":{
+ | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ | "Replication":2
+ | },
+ | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | }
+ | }
+ | ]}
+ |}
+ """
+
private val jobStartJsonString =
"""
{"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":