From 602734084c05a79c18446a3c2c051740dba143b3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 18 Apr 2016 15:17:29 -0700 Subject: [SPARK-14628][CORE][FOLLLOW-UP] Always tracking read/write metrics ## What changes were proposed in this pull request? This PR is a follow up for https://github.com/apache/spark/pull/12417, now we always track input/output/shuffle metrics in spark JSON protocol and status API. Most of the line changes are because of re-generating the gold answer for `HistoryServerSuite`, and we add a lot of 0 values for read/write metrics. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #12462 from cloud-fan/follow. --- .../org/apache/spark/executor/InputMetrics.scala | 5 - .../org/apache/spark/executor/OutputMetrics.scala | 5 - .../apache/spark/executor/ShuffleReadMetrics.scala | 5 - .../spark/executor/ShuffleWriteMetrics.scala | 5 - .../spark/status/api/v1/AllStagesResource.scala | 138 +++++++-------------- .../scala/org/apache/spark/status/api/v1/api.scala | 18 +-- .../scala/org/apache/spark/util/JsonProtocol.scala | 22 +--- 7 files changed, 59 insertions(+), 139 deletions(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 2181bde9f0..0ec81d8d35 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -58,11 +58,6 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul */ def recordsRead: Long = _recordsRead.localValue - /** - * Returns true if this metrics has been updated before. - */ - def isUpdated: Boolean = (bytesRead | recordsRead) != 0 - private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index 7f20f6bf0d..5b36cc4739 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -57,11 +57,6 @@ class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: */ def recordsWritten: Long = _recordsWritten.localValue - /** - * Returns true if this metrics has been updated before. - */ - def isUpdated: Boolean = (bytesWritten | recordsWritten) != 0 - private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v) private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v) } diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 9c78995ff3..47cfb74b9e 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -102,11 +102,6 @@ class ShuffleReadMetrics private ( */ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched - /** - * Returns true if this metrics has been updated before. - */ - def isUpdated: Boolean = (totalBytesRead | totalBlocksFetched | recordsRead | fetchWaitTime) != 0 - private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v) private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v) private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v) diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index cf570e1f9d..704dee747e 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -68,11 +68,6 @@ class ShuffleWriteMetrics private ( */ def writeTime: Long = _writeTime.localValue - /** - * Returns true if this metrics has been updated before. - */ - def isUpdated: Boolean = (writeTime | recordsWritten | bytesWritten) != 0 - private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 85452d6497..eddc36edc9 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -167,47 +167,32 @@ private[v1] object AllStagesResource { // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just // implement one "build" method, which just builds the quantiles for each field. - val inputMetrics: Option[InputMetricDistributions] = + val inputMetrics: InputMetricDistributions = new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = { - if (raw.inputMetrics.isUpdated) { - Some(raw.inputMetrics) - } else { - None - } - } + def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = raw.inputMetrics def build: InputMetricDistributions = new InputMetricDistributions( bytesRead = submetricQuantiles(_.bytesRead), recordsRead = submetricQuantiles(_.recordsRead) ) - }.metricOption + }.build - val outputMetrics: Option[OutputMetricDistributions] = + val outputMetrics: OutputMetricDistributions = new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = { - if (raw.outputMetrics.isUpdated) { - Some(raw.outputMetrics) - } else { - None - } - } + def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = raw.outputMetrics + def build: OutputMetricDistributions = new OutputMetricDistributions( bytesWritten = submetricQuantiles(_.bytesWritten), recordsWritten = submetricQuantiles(_.recordsWritten) ) - }.metricOption + }.build - val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] = + val shuffleReadMetrics: ShuffleReadMetricDistributions = new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = { - if (raw.shuffleReadMetrics.isUpdated) { - Some(raw.shuffleReadMetrics) - } else { - None - } - } + def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleReadMetrics = + raw.shuffleReadMetrics + def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions( readBytes = submetricQuantiles(_.totalBytesRead), readRecords = submetricQuantiles(_.recordsRead), @@ -217,24 +202,20 @@ private[v1] object AllStagesResource { totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched), fetchWaitTime = submetricQuantiles(_.fetchWaitTime) ) - }.metricOption + }.build - val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] = + val shuffleWriteMetrics: ShuffleWriteMetricDistributions = new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = { - if (raw.shuffleWriteMetrics.isUpdated) { - Some(raw.shuffleWriteMetrics) - } else { - None - } - } + def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleWriteMetrics = + raw.shuffleWriteMetrics + def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions( writeBytes = submetricQuantiles(_.bytesWritten), writeRecords = submetricQuantiles(_.recordsWritten), writeTime = submetricQuantiles(_.writeTime) ) - }.metricOption + }.build new TaskMetricDistributions( quantiles = quantiles, @@ -273,84 +254,55 @@ private[v1] object AllStagesResource { ) } - def convertInputMetrics(internal: InternalInputMetrics): Option[InputMetrics] = { - if (internal.isUpdated) { - Some(new InputMetrics( - bytesRead = internal.bytesRead, - recordsRead = internal.recordsRead - )) - } else { - None - } + def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = { + new InputMetrics( + bytesRead = internal.bytesRead, + recordsRead = internal.recordsRead + ) } - def convertOutputMetrics(internal: InternalOutputMetrics): Option[OutputMetrics] = { - if (internal.isUpdated) { - Some(new OutputMetrics( - bytesWritten = internal.bytesWritten, - recordsWritten = internal.recordsWritten - )) - } else { - None - } + def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = { + new OutputMetrics( + bytesWritten = internal.bytesWritten, + recordsWritten = internal.recordsWritten + ) } - def convertShuffleReadMetrics( - internal: InternalShuffleReadMetrics): Option[ShuffleReadMetrics] = { - if (internal.isUpdated) { - Some(new ShuffleReadMetrics( - remoteBlocksFetched = internal.remoteBlocksFetched, - localBlocksFetched = internal.localBlocksFetched, - fetchWaitTime = internal.fetchWaitTime, - remoteBytesRead = internal.remoteBytesRead, - totalBlocksFetched = internal.totalBlocksFetched, - recordsRead = internal.recordsRead - )) - } else { - None - } + def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = { + new ShuffleReadMetrics( + remoteBlocksFetched = internal.remoteBlocksFetched, + localBlocksFetched = internal.localBlocksFetched, + fetchWaitTime = internal.fetchWaitTime, + remoteBytesRead = internal.remoteBytesRead, + localBytesRead = internal.localBytesRead, + recordsRead = internal.recordsRead + ) } - def convertShuffleWriteMetrics( - internal: InternalShuffleWriteMetrics): Option[ShuffleWriteMetrics] = { - if ((internal.bytesWritten | internal.writeTime | internal.recordsWritten) == 0) { - None - } else { - Some(new ShuffleWriteMetrics( - bytesWritten = internal.bytesWritten, - writeTime = internal.writeTime, - recordsWritten = internal.recordsWritten - )) - } + def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { + new ShuffleWriteMetrics( + bytesWritten = internal.bytesWritten, + writeTime = internal.writeTime, + recordsWritten = internal.recordsWritten + ) } } /** - * Helper for getting distributions from nested metric types. Many of the metrics we want are - * contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle - * the options (returning None if the metrics are all empty), and extract the quantiles for each - * metric. After creating an instance, call metricOption to get the result type. + * Helper for getting distributions from nested metric types. */ private[v1] abstract class MetricHelper[I, O]( rawMetrics: Seq[InternalTaskMetrics], quantiles: Array[Double]) { - def getSubmetrics(raw: InternalTaskMetrics): Option[I] + def getSubmetrics(raw: InternalTaskMetrics): I def build: O - val data: Seq[I] = rawMetrics.flatMap(getSubmetrics) + val data: Seq[I] = rawMetrics.map(getSubmetrics) /** applies the given function to all input metrics, and returns the quantiles */ def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) } - - def metricOption: Option[O] = { - if (data.isEmpty) { - None - } else { - Some(build) - } - } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index ebbbf48148..ff28796a60 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -172,10 +172,10 @@ class TaskMetrics private[spark]( val resultSerializationTime: Long, val memoryBytesSpilled: Long, val diskBytesSpilled: Long, - val inputMetrics: Option[InputMetrics], - val outputMetrics: Option[OutputMetrics], - val shuffleReadMetrics: Option[ShuffleReadMetrics], - val shuffleWriteMetrics: Option[ShuffleWriteMetrics]) + val inputMetrics: InputMetrics, + val outputMetrics: OutputMetrics, + val shuffleReadMetrics: ShuffleReadMetrics, + val shuffleWriteMetrics: ShuffleWriteMetrics) class InputMetrics private[spark]( val bytesRead: Long, @@ -190,7 +190,7 @@ class ShuffleReadMetrics private[spark]( val localBlocksFetched: Int, val fetchWaitTime: Long, val remoteBytesRead: Long, - val totalBlocksFetched: Int, + val localBytesRead: Long, val recordsRead: Long) class ShuffleWriteMetrics private[spark]( @@ -209,10 +209,10 @@ class TaskMetricDistributions private[spark]( val memoryBytesSpilled: IndexedSeq[Double], val diskBytesSpilled: IndexedSeq[Double], - val inputMetrics: Option[InputMetricDistributions], - val outputMetrics: Option[OutputMetricDistributions], - val shuffleReadMetrics: Option[ShuffleReadMetricDistributions], - val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]) + val inputMetrics: InputMetricDistributions, + val outputMetrics: OutputMetricDistributions, + val shuffleReadMetrics: ShuffleReadMetricDistributions, + val shuffleWriteMetrics: ShuffleWriteMetricDistributions) class InputMetricDistributions private[spark]( val bytesRead: IndexedSeq[Double], diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 17b33c7c62..38ca3224ff 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -326,39 +326,27 @@ private[spark] object JsonProtocol { } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { - val shuffleReadMetrics: JValue = if (taskMetrics.shuffleReadMetrics.isUpdated) { + val shuffleReadMetrics: JValue = ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~ ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~ ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~ ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead) - } else { - JNothing - } - val shuffleWriteMetrics: JValue = if (taskMetrics.shuffleWriteMetrics.isUpdated) { + val shuffleWriteMetrics: JValue = ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~ ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~ ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten) - } else { - JNothing - } - val inputMetrics: JValue = if (taskMetrics.inputMetrics.isUpdated) { + val inputMetrics: JValue = ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~ ("Records Read" -> taskMetrics.inputMetrics.recordsRead) - } else { - JNothing - } - val outputMetrics: JValue = if (taskMetrics.outputMetrics.isUpdated) { + val outputMetrics: JValue = ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~ ("Records Written" -> taskMetrics.outputMetrics.recordsWritten) - } else { - JNothing - } val updatedBlocks = JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => ("Block ID" -> id.toString) ~ - ("Status" -> blockStatusToJson(status)) + ("Status" -> blockStatusToJson(status)) }) ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ -- cgit v1.2.3