aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-18 15:17:29 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 15:17:29 -0700
commit602734084c05a79c18446a3c2c051740dba143b3 (patch)
tree5cddac1f7a8add17ec705205e9cc5001d3d63d69 /core/src/main/scala
parent6ff0435858eed8310c0298ef0394053dfe06df9e (diff)
downloadspark-602734084c05a79c18446a3c2c051740dba143b3.tar.gz
spark-602734084c05a79c18446a3c2c051740dba143b3.tar.bz2
spark-602734084c05a79c18446a3c2c051740dba143b3.zip
[SPARK-14628][CORE][FOLLLOW-UP] Always tracking read/write metrics
## What changes were proposed in this pull request? This PR is a follow up for https://github.com/apache/spark/pull/12417, now we always track input/output/shuffle metrics in spark JSON protocol and status API. Most of the line changes are because of re-generating the gold answer for `HistoryServerSuite`, and we add a lot of 0 values for read/write metrics. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12462 from cloud-fan/follow.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala22
7 files changed, 59 insertions, 139 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 2181bde9f0..0ec81d8d35 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -58,11 +58,6 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
*/
def recordsRead: Long = _recordsRead.localValue
- /**
- * Returns true if this metrics has been updated before.
- */
- def isUpdated: Boolean = (bytesRead | recordsRead) != 0
-
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 7f20f6bf0d..5b36cc4739 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -57,11 +57,6 @@ class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten:
*/
def recordsWritten: Long = _recordsWritten.localValue
- /**
- * Returns true if this metrics has been updated before.
- */
- def isUpdated: Boolean = (bytesWritten | recordsWritten) != 0
-
private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 9c78995ff3..47cfb74b9e 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -102,11 +102,6 @@ class ShuffleReadMetrics private (
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
- /**
- * Returns true if this metrics has been updated before.
- */
- def isUpdated: Boolean = (totalBytesRead | totalBlocksFetched | recordsRead | fetchWaitTime) != 0
-
private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index cf570e1f9d..704dee747e 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -68,11 +68,6 @@ class ShuffleWriteMetrics private (
*/
def writeTime: Long = _writeTime.localValue
- /**
- * Returns true if this metrics has been updated before.
- */
- def isUpdated: Boolean = (writeTime | recordsWritten | bytesWritten) != 0
-
private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 85452d6497..eddc36edc9 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -167,47 +167,32 @@ private[v1] object AllStagesResource {
// to make it a little easier to deal w/ all of the nested options. Mostly it lets us just
// implement one "build" method, which just builds the quantiles for each field.
- val inputMetrics: Option[InputMetricDistributions] =
+ val inputMetrics: InputMetricDistributions =
new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) {
- def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = {
- if (raw.inputMetrics.isUpdated) {
- Some(raw.inputMetrics)
- } else {
- None
- }
- }
+ def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = raw.inputMetrics
def build: InputMetricDistributions = new InputMetricDistributions(
bytesRead = submetricQuantiles(_.bytesRead),
recordsRead = submetricQuantiles(_.recordsRead)
)
- }.metricOption
+ }.build
- val outputMetrics: Option[OutputMetricDistributions] =
+ val outputMetrics: OutputMetricDistributions =
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
- def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = {
- if (raw.outputMetrics.isUpdated) {
- Some(raw.outputMetrics)
- } else {
- None
- }
- }
+ def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = raw.outputMetrics
+
def build: OutputMetricDistributions = new OutputMetricDistributions(
bytesWritten = submetricQuantiles(_.bytesWritten),
recordsWritten = submetricQuantiles(_.recordsWritten)
)
- }.metricOption
+ }.build
- val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] =
+ val shuffleReadMetrics: ShuffleReadMetricDistributions =
new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics,
quantiles) {
- def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = {
- if (raw.shuffleReadMetrics.isUpdated) {
- Some(raw.shuffleReadMetrics)
- } else {
- None
- }
- }
+ def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleReadMetrics =
+ raw.shuffleReadMetrics
+
def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
readBytes = submetricQuantiles(_.totalBytesRead),
readRecords = submetricQuantiles(_.recordsRead),
@@ -217,24 +202,20 @@ private[v1] object AllStagesResource {
totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
)
- }.metricOption
+ }.build
- val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] =
+ val shuffleWriteMetrics: ShuffleWriteMetricDistributions =
new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics,
quantiles) {
- def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = {
- if (raw.shuffleWriteMetrics.isUpdated) {
- Some(raw.shuffleWriteMetrics)
- } else {
- None
- }
- }
+ def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleWriteMetrics =
+ raw.shuffleWriteMetrics
+
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
)
- }.metricOption
+ }.build
new TaskMetricDistributions(
quantiles = quantiles,
@@ -273,84 +254,55 @@ private[v1] object AllStagesResource {
)
}
- def convertInputMetrics(internal: InternalInputMetrics): Option[InputMetrics] = {
- if (internal.isUpdated) {
- Some(new InputMetrics(
- bytesRead = internal.bytesRead,
- recordsRead = internal.recordsRead
- ))
- } else {
- None
- }
+ def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
+ new InputMetrics(
+ bytesRead = internal.bytesRead,
+ recordsRead = internal.recordsRead
+ )
}
- def convertOutputMetrics(internal: InternalOutputMetrics): Option[OutputMetrics] = {
- if (internal.isUpdated) {
- Some(new OutputMetrics(
- bytesWritten = internal.bytesWritten,
- recordsWritten = internal.recordsWritten
- ))
- } else {
- None
- }
+ def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
+ new OutputMetrics(
+ bytesWritten = internal.bytesWritten,
+ recordsWritten = internal.recordsWritten
+ )
}
- def convertShuffleReadMetrics(
- internal: InternalShuffleReadMetrics): Option[ShuffleReadMetrics] = {
- if (internal.isUpdated) {
- Some(new ShuffleReadMetrics(
- remoteBlocksFetched = internal.remoteBlocksFetched,
- localBlocksFetched = internal.localBlocksFetched,
- fetchWaitTime = internal.fetchWaitTime,
- remoteBytesRead = internal.remoteBytesRead,
- totalBlocksFetched = internal.totalBlocksFetched,
- recordsRead = internal.recordsRead
- ))
- } else {
- None
- }
+ def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = {
+ new ShuffleReadMetrics(
+ remoteBlocksFetched = internal.remoteBlocksFetched,
+ localBlocksFetched = internal.localBlocksFetched,
+ fetchWaitTime = internal.fetchWaitTime,
+ remoteBytesRead = internal.remoteBytesRead,
+ localBytesRead = internal.localBytesRead,
+ recordsRead = internal.recordsRead
+ )
}
- def convertShuffleWriteMetrics(
- internal: InternalShuffleWriteMetrics): Option[ShuffleWriteMetrics] = {
- if ((internal.bytesWritten | internal.writeTime | internal.recordsWritten) == 0) {
- None
- } else {
- Some(new ShuffleWriteMetrics(
- bytesWritten = internal.bytesWritten,
- writeTime = internal.writeTime,
- recordsWritten = internal.recordsWritten
- ))
- }
+ def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
+ new ShuffleWriteMetrics(
+ bytesWritten = internal.bytesWritten,
+ writeTime = internal.writeTime,
+ recordsWritten = internal.recordsWritten
+ )
}
}
/**
- * Helper for getting distributions from nested metric types. Many of the metrics we want are
- * contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle
- * the options (returning None if the metrics are all empty), and extract the quantiles for each
- * metric. After creating an instance, call metricOption to get the result type.
+ * Helper for getting distributions from nested metric types.
*/
private[v1] abstract class MetricHelper[I, O](
rawMetrics: Seq[InternalTaskMetrics],
quantiles: Array[Double]) {
- def getSubmetrics(raw: InternalTaskMetrics): Option[I]
+ def getSubmetrics(raw: InternalTaskMetrics): I
def build: O
- val data: Seq[I] = rawMetrics.flatMap(getSubmetrics)
+ val data: Seq[I] = rawMetrics.map(getSubmetrics)
/** applies the given function to all input metrics, and returns the quantiles */
def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
}
-
- def metricOption: Option[O] = {
- if (data.isEmpty) {
- None
- } else {
- Some(build)
- }
- }
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index ebbbf48148..ff28796a60 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -172,10 +172,10 @@ class TaskMetrics private[spark](
val resultSerializationTime: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
- val inputMetrics: Option[InputMetrics],
- val outputMetrics: Option[OutputMetrics],
- val shuffleReadMetrics: Option[ShuffleReadMetrics],
- val shuffleWriteMetrics: Option[ShuffleWriteMetrics])
+ val inputMetrics: InputMetrics,
+ val outputMetrics: OutputMetrics,
+ val shuffleReadMetrics: ShuffleReadMetrics,
+ val shuffleWriteMetrics: ShuffleWriteMetrics)
class InputMetrics private[spark](
val bytesRead: Long,
@@ -190,7 +190,7 @@ class ShuffleReadMetrics private[spark](
val localBlocksFetched: Int,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
- val totalBlocksFetched: Int,
+ val localBytesRead: Long,
val recordsRead: Long)
class ShuffleWriteMetrics private[spark](
@@ -209,10 +209,10 @@ class TaskMetricDistributions private[spark](
val memoryBytesSpilled: IndexedSeq[Double],
val diskBytesSpilled: IndexedSeq[Double],
- val inputMetrics: Option[InputMetricDistributions],
- val outputMetrics: Option[OutputMetricDistributions],
- val shuffleReadMetrics: Option[ShuffleReadMetricDistributions],
- val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions])
+ val inputMetrics: InputMetricDistributions,
+ val outputMetrics: OutputMetricDistributions,
+ val shuffleReadMetrics: ShuffleReadMetricDistributions,
+ val shuffleWriteMetrics: ShuffleWriteMetricDistributions)
class InputMetricDistributions private[spark](
val bytesRead: IndexedSeq[Double],
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 17b33c7c62..38ca3224ff 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -326,39 +326,27 @@ private[spark] object JsonProtocol {
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
- val shuffleReadMetrics: JValue = if (taskMetrics.shuffleReadMetrics.isUpdated) {
+ val shuffleReadMetrics: JValue =
("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
- } else {
- JNothing
- }
- val shuffleWriteMetrics: JValue = if (taskMetrics.shuffleWriteMetrics.isUpdated) {
+ val shuffleWriteMetrics: JValue =
("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
- } else {
- JNothing
- }
- val inputMetrics: JValue = if (taskMetrics.inputMetrics.isUpdated) {
+ val inputMetrics: JValue =
("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
("Records Read" -> taskMetrics.inputMetrics.recordsRead)
- } else {
- JNothing
- }
- val outputMetrics: JValue = if (taskMetrics.outputMetrics.isUpdated) {
+ val outputMetrics: JValue =
("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
- } else {
- JNothing
- }
val updatedBlocks =
JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
- ("Status" -> blockStatusToJson(status))
+ ("Status" -> blockStatusToJson(status))
})
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~