diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-09-20 16:03:17 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-09-20 16:03:17 -0700 |
commit | 7c8ad1c0838762f5b632f683834c88a711aef4dd (patch) | |
tree | fbbdf3ea9651d18187e37a66f3f831bbaac61a18 | |
parent | 5f8833c672ab64aa5886a8239ae2ff2a8ea42363 (diff) | |
download | spark-7c8ad1c0838762f5b632f683834c88a711aef4dd.tar.gz spark-7c8ad1c0838762f5b632f683834c88a711aef4dd.tar.bz2 spark-7c8ad1c0838762f5b632f683834c88a711aef4dd.zip |
SPARK-3574. Shuffle finish time always reported as -1
The included test waits 100 ms after job completion for task completion events to come in so it can verify they have reasonable finish times. Does anyone know a better way to wait on listener events that are expected to come in?
Author: Sandy Ryza <sandy@cloudera.com>
Closes #2440 from sryza/sandy-spark-3574 and squashes the following commits:
c81439b [Sandy Ryza] Fix test failure
b340956 [Sandy Ryza] SPARK-3574. Remove shuffleFinishTime metric
4 files changed, 0 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 99a88c1345..3e49b6235a 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -137,7 +137,6 @@ class TaskMetrics extends Serializable { merged.localBlocksFetched += depMetrics.localBlocksFetched merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched merged.remoteBytesRead += depMetrics.remoteBytesRead - merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime) } _shuffleReadMetrics = Some(merged) } @@ -178,11 +177,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { @DeveloperApi class ShuffleReadMetrics extends Serializable { /** - * Absolute time when this task finished reading shuffle data - */ - var shuffleFinishTime: Long = -1 - - /** * Number of blocks fetched in this shuffle by this task (remote or local) */ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 4d6b5c8188..ceb434feb6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -171,7 +171,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener } val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match { case Some(metrics) => - " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c4dddb2d10..6a48f673c4 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -255,7 +255,6 @@ private[spark] object JsonProtocol { } def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { - ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~ ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ @@ -590,7 +589,6 @@ private[spark] object JsonProtocol { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { val metrics = new ShuffleReadMetrics - metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long] metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 2b45d8b695..f1f88c5fd3 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -356,7 +356,6 @@ class JsonProtocolSuite extends FunSuite { } private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) { - assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime) assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched) assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched) assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime) @@ -568,7 +567,6 @@ class JsonProtocolSuite extends FunSuite { t.inputMetrics = Some(inputMetrics) } else { val sr = new ShuffleReadMetrics - sr.shuffleFinishTime = b + c sr.remoteBytesRead = b + d sr.localBlocksFetched = e sr.fetchWaitTime = a + d @@ -806,7 +804,6 @@ class JsonProtocolSuite extends FunSuite { | "Memory Bytes Spilled": 800, | "Disk Bytes Spilled": 0, | "Shuffle Read Metrics": { - | "Shuffle Finish Time": 900, | "Remote Blocks Fetched": 800, | "Local Blocks Fetched": 700, | "Fetch Wait Time": 900, |