From 7c8ad1c0838762f5b632f683834c88a711aef4dd Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Sat, 20 Sep 2014 16:03:17 -0700 Subject: SPARK-3574. Shuffle finish time always reported as -1 The included test waits 100 ms after job completion for task completion events to come in so it can verify they have reasonable finish times. Does anyone know a better way to wait on listener events that are expected to come in? Author: Sandy Ryza Closes #2440 from sryza/sandy-spark-3574 and squashes the following commits: c81439b [Sandy Ryza] Fix test failure b340956 [Sandy Ryza] SPARK-3574. Remove shuffleFinishTime metric --- core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 6 ------ core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala | 1 - core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 -- 3 files changed, 9 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 99a88c1345..3e49b6235a 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -137,7 +137,6 @@ class TaskMetrics extends Serializable { merged.localBlocksFetched += depMetrics.localBlocksFetched merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched merged.remoteBytesRead += depMetrics.remoteBytesRead - merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime) } _shuffleReadMetrics = Some(merged) } @@ -177,11 +176,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { */ @DeveloperApi class ShuffleReadMetrics extends Serializable { - /** - * Absolute time when this task finished reading shuffle data - */ - var shuffleFinishTime: Long = -1 - /** * Number of blocks fetched in this shuffle by this task (remote or local) */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 4d6b5c8188..ceb434feb6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -171,7 +171,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener } val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match { case Some(metrics) => - " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c4dddb2d10..6a48f673c4 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -255,7 +255,6 @@ private[spark] object JsonProtocol { } def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { - ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~ ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ @@ -590,7 +589,6 @@ private[spark] object JsonProtocol { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { val metrics = new ShuffleReadMetrics - metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long] metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] -- cgit v1.2.3