aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java2
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java2
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala122
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala100
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala2
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java12
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala122
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala69
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala31
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala143
-rw-r--r--project/MimaExcludes.scala5
34 files changed, 330 insertions, 610 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 7a60c3eb35..0e9defe5b4 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
- this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 0c5fb883a8..daa63d47e6 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -118,7 +118,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
- this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ef79b49083..3e32dd9d63 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -129,7 +129,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 0dd4ec656f..714c8737a9 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -68,14 +68,12 @@ private[spark] object InternalAccumulator {
// Names of output metrics
object output {
- val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod"
val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten"
val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten"
}
// Names of input metrics
object input {
- val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod"
val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
}
@@ -110,8 +108,6 @@ private[spark] object InternalAccumulator {
case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
- case input.READ_METHOD => StringAccumulatorParam
- case output.WRITE_METHOD => StringAccumulatorParam
case _ => LongAccumulatorParam
}
}
@@ -165,7 +161,6 @@ private[spark] object InternalAccumulator {
*/
def createInputAccums(): Seq[Accumulator[_]] = {
Seq[String](
- input.READ_METHOD,
input.BYTES_READ,
input.RECORDS_READ).map(create)
}
@@ -175,7 +170,6 @@ private[spark] object InternalAccumulator {
*/
def createOutputAccums(): Seq[Accumulator[_]] = {
Seq[String](
- output.WRITE_METHOD,
output.BYTES_WRITTEN,
output.RECORDS_WRITTEN).map(create)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 83e11c5e23..2181bde9f0 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -39,31 +39,13 @@ object DataReadMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
-class InputMetrics private (
- _bytesRead: Accumulator[Long],
- _recordsRead: Accumulator[Long],
- _readMethod: Accumulator[String])
+class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long])
extends Serializable {
private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
this(
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ),
- TaskMetrics.getAccum[String](accumMap, InternalAccumulator.input.READ_METHOD))
- }
-
- /**
- * Create a new [[InputMetrics]] that is not associated with any particular task.
- *
- * This mainly exists because of SPARK-5225, where we are forced to use a dummy [[InputMetrics]]
- * because we want to ignore metrics from a second read method. In the future, we should revisit
- * whether this is needed.
- *
- * A better alternative is [[TaskMetrics.registerInputMetrics]].
- */
- private[executor] def this() {
- this(InternalAccumulator.createInputAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ))
}
/**
@@ -77,13 +59,12 @@ class InputMetrics private (
def recordsRead: Long = _recordsRead.localValue
/**
- * The source from which this task reads its input.
+ * Returns true if this metrics has been updated before.
*/
- def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
+ def isUpdated: Boolean = (bytesRead | recordsRead) != 0
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
- private[spark] def setReadMethod(v: DataReadMethod.Value): Unit = _readMethod.setValue(v.toString)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 93f953846f..7f20f6bf0d 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -38,17 +38,13 @@ object DataWriteMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
-class OutputMetrics private (
- _bytesWritten: Accumulator[Long],
- _recordsWritten: Accumulator[Long],
- _writeMethod: Accumulator[String])
+class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long])
extends Serializable {
private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
this(
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN),
- TaskMetrics.getAccum[String](accumMap, InternalAccumulator.output.WRITE_METHOD))
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN))
}
/**
@@ -62,13 +58,10 @@ class OutputMetrics private (
def recordsWritten: Long = _recordsWritten.localValue
/**
- * The source to which this task writes its output.
+ * Returns true if this metrics has been updated before.
*/
- def writeMethod: DataWriteMethod.Value = DataWriteMethod.withName(_writeMethod.localValue)
+ def isUpdated: Boolean = (bytesWritten | recordsWritten) != 0
private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
- private[spark] def setWriteMethod(v: DataWriteMethod.Value): Unit =
- _writeMethod.setValue(v.toString)
-
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 71a24770b5..9c78995ff3 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -53,7 +53,7 @@ class ShuffleReadMetrics private (
* many places only to merge their values together later. In the future, we should revisit
* whether this is needed.
*
- * A better alternative is [[TaskMetrics.registerTempShuffleReadMetrics]] followed by
+ * A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by
* [[TaskMetrics.mergeShuffleReadMetrics]].
*/
private[spark] def this() {
@@ -102,6 +102,11 @@ class ShuffleReadMetrics private (
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
+ /**
+ * Returns true if this metrics has been updated before.
+ */
+ def isUpdated: Boolean = (totalBytesRead | totalBlocksFetched | recordsRead | fetchWaitTime) != 0
+
private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index c7aaabb561..cf570e1f9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -47,7 +47,7 @@ class ShuffleWriteMetrics private (
* many places only to merge their values together later. In the future, we should revisit
* whether this is needed.
*
- * A better alternative is [[TaskMetrics.registerShuffleWriteMetrics]].
+ * A better alternative is [[TaskMetrics.shuffleWriteMetrics]].
*/
private[spark] def this() {
this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap)
@@ -68,6 +68,11 @@ class ShuffleWriteMetrics private (
*/
def writeTime: Long = _writeTime.localValue
+ /**
+ * Returns true if this metrics has been updated before.
+ */
+ def isUpdated: Boolean = (writeTime | recordsWritten | bytesWritten) != 0
+
private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index bda2a91d9d..0198364825 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -91,6 +91,14 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
private val _updatedBlockStatuses =
TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES)
+ private val _inputMetrics = new InputMetrics(initialAccumsMap)
+
+ private val _outputMetrics = new OutputMetrics(initialAccumsMap)
+
+ private val _shuffleReadMetrics = new ShuffleReadMetrics(initialAccumsMap)
+
+ private val _shuffleWriteMetrics = new ShuffleWriteMetrics(initialAccumsMap)
+
/**
* Time taken on the executor to deserialize this task.
*/
@@ -163,83 +171,23 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
TaskMetrics.getAccum[Long](initialAccumsMap, name)
}
-
- /* ========================== *
- | INPUT METRICS |
- * ========================== */
-
- private var _inputMetrics: Option[InputMetrics] = None
-
/**
* Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
* data, defined only in tasks with input.
*/
- def inputMetrics: Option[InputMetrics] = _inputMetrics
-
- /**
- * Get or create a new [[InputMetrics]] associated with this task.
- */
- private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = {
- synchronized {
- val metrics = _inputMetrics.getOrElse {
- val metrics = new InputMetrics(initialAccumsMap)
- metrics.setReadMethod(readMethod)
- _inputMetrics = Some(metrics)
- metrics
- }
- // If there already exists an InputMetric with the same read method, we can just return
- // that one. Otherwise, if the read method is different from the one previously seen by
- // this task, we return a new dummy one to avoid clobbering the values of the old metrics.
- // In the future we should try to store input metrics from all different read methods at
- // the same time (SPARK-5225).
- if (metrics.readMethod == readMethod) {
- metrics
- } else {
- val m = new InputMetrics
- m.setReadMethod(readMethod)
- m
- }
- }
- }
-
-
- /* ============================ *
- | OUTPUT METRICS |
- * ============================ */
-
- private var _outputMetrics: Option[OutputMetrics] = None
+ def inputMetrics: InputMetrics = _inputMetrics
/**
* Metrics related to writing data externally (e.g. to a distributed filesystem),
* defined only in tasks with output.
*/
- def outputMetrics: Option[OutputMetrics] = _outputMetrics
-
- /**
- * Get or create a new [[OutputMetrics]] associated with this task.
- */
- private[spark] def registerOutputMetrics(
- writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized {
- _outputMetrics.getOrElse {
- val metrics = new OutputMetrics(initialAccumsMap)
- metrics.setWriteMethod(writeMethod)
- _outputMetrics = Some(metrics)
- metrics
- }
- }
-
-
- /* ================================== *
- | SHUFFLE READ METRICS |
- * ================================== */
-
- private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+ def outputMetrics: OutputMetrics = _outputMetrics
/**
* Metrics related to shuffle read aggregated across all shuffle dependencies.
* This is defined only if there are shuffle dependencies in this task.
*/
- def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics
+ def shuffleReadMetrics: ShuffleReadMetrics = _shuffleReadMetrics
/**
* Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
@@ -257,7 +205,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
* merges the temporary values synchronously. Otherwise, all temporary data collected will
* be lost.
*/
- private[spark] def registerTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
+ private[spark] def createTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
val readMetrics = new ShuffleReadMetrics
tempShuffleReadMetrics += readMetrics
readMetrics
@@ -269,34 +217,14 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
- val metrics = new ShuffleReadMetrics(initialAccumsMap)
- metrics.setMergeValues(tempShuffleReadMetrics)
- _shuffleReadMetrics = Some(metrics)
+ _shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
}
}
- /* =================================== *
- | SHUFFLE WRITE METRICS |
- * =================================== */
-
- private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
-
/**
* Metrics related to shuffle write, defined only in shuffle map stages.
*/
- def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
-
- /**
- * Get or create a new [[ShuffleWriteMetrics]] associated with this task.
- */
- private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized {
- _shuffleWriteMetrics.getOrElse {
- val metrics = new ShuffleWriteMetrics(initialAccumsMap)
- _shuffleWriteMetrics = Some(metrics)
- metrics
- }
- }
-
+ def shuffleWriteMetrics: ShuffleWriteMetrics = _shuffleWriteMetrics
/* ========================== *
| OTHER THINGS |
@@ -316,28 +244,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
def accumulatorUpdates(): Seq[AccumulableInfo] = {
accums.map { a => a.toInfo(Some(a.localValue), None) }
}
-
- // If we are reconstructing this TaskMetrics on the driver, some metrics may already be set.
- // If so, initialize all relevant metrics classes so listeners can access them downstream.
- {
- var (hasShuffleRead, hasShuffleWrite, hasInput, hasOutput) = (false, false, false, false)
- initialAccums
- .filter { a => a.localValue != a.zero }
- .foreach { a =>
- a.name.get match {
- case sr if sr.startsWith(SHUFFLE_READ_METRICS_PREFIX) => hasShuffleRead = true
- case sw if sw.startsWith(SHUFFLE_WRITE_METRICS_PREFIX) => hasShuffleWrite = true
- case in if in.startsWith(INPUT_METRICS_PREFIX) => hasInput = true
- case out if out.startsWith(OUTPUT_METRICS_PREFIX) => hasOutput = true
- case _ =>
- }
- }
- if (hasShuffleRead) { _shuffleReadMetrics = Some(new ShuffleReadMetrics(initialAccumsMap)) }
- if (hasShuffleWrite) { _shuffleWriteMetrics = Some(new ShuffleWriteMetrics(initialAccumsMap)) }
- if (hasInput) { _inputMetrics = Some(new InputMetrics(initialAccumsMap)) }
- if (hasOutput) { _outputMetrics = Some(new OutputMetrics(initialAccumsMap)) }
- }
-
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 35d190b464..6b1e15572c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,7 +213,7 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 3ccd616cbf..a71c191b31 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -130,7 +130,7 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf
- val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
// Find a function that will return the FileSystem bytes read by this thread. Do this before
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 085829af6e..7936d8e1d4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1218,7 +1218,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
context: TaskContext): Option[(OutputMetrics, () => Long)] = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
bytesWrittenCallback.map { b =>
- (context.taskMetrics().registerOutputMetrics(DataWriteMethod.Hadoop), b)
+ (context.taskMetrics().outputMetrics, b)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 36ff3bcaae..f6e0148f78 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -332,7 +332,7 @@ abstract class RDD[T: ClassTag](
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
- val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
+ val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
index 309f4b806b..3c8cab7504 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
@@ -47,19 +47,19 @@ class StatsReportListener extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
implicit val sc = stageCompleted
this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
- showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
+ showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics)
// Shuffle write
showBytesDistribution("shuffle bytes written:",
- (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
+ (_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics)
// Fetch & I/O
showMillisDistribution("fetch wait time:",
- (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
+ (_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics)
showBytesDistribution("remote bytes read:",
- (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
+ (_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics)
showBytesDistribution("task result size:",
- (_, metric) => Some(metric.resultSize), taskInfoMetrics)
+ (_, metric) => metric.resultSize, taskInfoMetrics)
// Runtime breakdown
val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
@@ -95,17 +95,17 @@ private[spark] object StatsReportListener extends Logging {
def extractDoubleDistribution(
taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
- getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
- Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
+ getMetric: (TaskInfo, TaskMetrics) => Double): Option[Distribution] = {
+ Distribution(taskInfoMetrics.map { case (info, metric) => getMetric(info, metric) })
}
// Is there some way to setup the types that I can get rid of this completely?
def extractLongDistribution(
taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
- getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
+ getMetric: (TaskInfo, TaskMetrics) => Long): Option[Distribution] = {
extractDoubleDistribution(
taskInfoMetrics,
- (info, metric) => { getMetric(info, metric).map(_.toDouble) })
+ (info, metric) => { getMetric(info, metric).toDouble })
}
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
@@ -117,9 +117,9 @@ private[spark] object StatsReportListener extends Logging {
}
def showDistribution(
- heading: String,
- dOpt: Option[Distribution],
- formatNumber: Double => String) {
+ heading: String,
+ dOpt: Option[Distribution],
+ formatNumber: Double => String) {
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}
@@ -129,17 +129,17 @@ private[spark] object StatsReportListener extends Logging {
}
def showDistribution(
- heading: String,
- format: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Double],
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ heading: String,
+ format: String,
+ getMetric: (TaskInfo, TaskMetrics) => Double,
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
}
def showBytesDistribution(
- heading: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Long],
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ heading: String,
+ getMetric: (TaskInfo, TaskMetrics) => Long,
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
}
@@ -157,9 +157,9 @@ private[spark] object StatsReportListener extends Logging {
}
def showMillisDistribution(
- heading: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Long],
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ heading: String,
+ getMetric: (TaskInfo, TaskMetrics) => Long,
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
}
@@ -190,7 +190,7 @@ private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Doubl
private object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
- val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
+ val fetchTime = Some(metrics.shuffleReadMetrics.fetchWaitTime)
val fetch = fetchTime.map(_ / denom)
val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 876cdfaa87..5794f542b7 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -67,7 +67,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
}
// Update the context task metrics for each record read.
- val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics()
+ val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 9276d95012..6c4444ffb4 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -41,7 +41,7 @@ private[spark] class HashShuffleWriter[K, V](
// we don't try deleting files, etc twice.
private var stopping = false
- private val writeMetrics = metrics.registerShuffleWriteMetrics()
+ private val writeMetrics = metrics.shuffleWriteMetrics
private val blockManager = SparkEnv.get.blockManager
private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits,
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 8ab1cee2e8..1adacabc86 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -45,7 +45,7 @@ private[spark] class SortShuffleWriter[K, V, C](
private var mapStatus: MapStatus = null
- private val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics()
+ private val writeMetrics = context.taskMetrics().shuffleWriteMetrics
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index f8d6e9fbbb..85452d6497 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -170,7 +170,11 @@ private[v1] object AllStagesResource {
val inputMetrics: Option[InputMetricDistributions] =
new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = {
- raw.inputMetrics
+ if (raw.inputMetrics.isUpdated) {
+ Some(raw.inputMetrics)
+ } else {
+ None
+ }
}
def build: InputMetricDistributions = new InputMetricDistributions(
@@ -182,7 +186,11 @@ private[v1] object AllStagesResource {
val outputMetrics: Option[OutputMetricDistributions] =
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = {
- raw.outputMetrics
+ if (raw.outputMetrics.isUpdated) {
+ Some(raw.outputMetrics)
+ } else {
+ None
+ }
}
def build: OutputMetricDistributions = new OutputMetricDistributions(
bytesWritten = submetricQuantiles(_.bytesWritten),
@@ -194,7 +202,11 @@ private[v1] object AllStagesResource {
new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics,
quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = {
- raw.shuffleReadMetrics
+ if (raw.shuffleReadMetrics.isUpdated) {
+ Some(raw.shuffleReadMetrics)
+ } else {
+ None
+ }
}
def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
readBytes = submetricQuantiles(_.totalBytesRead),
@@ -211,7 +223,11 @@ private[v1] object AllStagesResource {
new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics,
quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = {
- raw.shuffleWriteMetrics
+ if (raw.shuffleWriteMetrics.isUpdated) {
+ Some(raw.shuffleWriteMetrics)
+ } else {
+ None
+ }
}
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.bytesWritten),
@@ -250,44 +266,62 @@ private[v1] object AllStagesResource {
resultSerializationTime = internal.resultSerializationTime,
memoryBytesSpilled = internal.memoryBytesSpilled,
diskBytesSpilled = internal.diskBytesSpilled,
- inputMetrics = internal.inputMetrics.map { convertInputMetrics },
- outputMetrics = Option(internal.outputMetrics).flatten.map { convertOutputMetrics },
- shuffleReadMetrics = internal.shuffleReadMetrics.map { convertShuffleReadMetrics },
- shuffleWriteMetrics = internal.shuffleWriteMetrics.map { convertShuffleWriteMetrics }
+ inputMetrics = convertInputMetrics(internal.inputMetrics),
+ outputMetrics = convertOutputMetrics(internal.outputMetrics),
+ shuffleReadMetrics = convertShuffleReadMetrics(internal.shuffleReadMetrics),
+ shuffleWriteMetrics = convertShuffleWriteMetrics(internal.shuffleWriteMetrics)
)
}
- def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
- new InputMetrics(
- bytesRead = internal.bytesRead,
- recordsRead = internal.recordsRead
- )
+ def convertInputMetrics(internal: InternalInputMetrics): Option[InputMetrics] = {
+ if (internal.isUpdated) {
+ Some(new InputMetrics(
+ bytesRead = internal.bytesRead,
+ recordsRead = internal.recordsRead
+ ))
+ } else {
+ None
+ }
}
- def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
- new OutputMetrics(
- bytesWritten = internal.bytesWritten,
- recordsWritten = internal.recordsWritten
- )
+ def convertOutputMetrics(internal: InternalOutputMetrics): Option[OutputMetrics] = {
+ if (internal.isUpdated) {
+ Some(new OutputMetrics(
+ bytesWritten = internal.bytesWritten,
+ recordsWritten = internal.recordsWritten
+ ))
+ } else {
+ None
+ }
}
- def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = {
- new ShuffleReadMetrics(
- remoteBlocksFetched = internal.remoteBlocksFetched,
- localBlocksFetched = internal.localBlocksFetched,
- fetchWaitTime = internal.fetchWaitTime,
- remoteBytesRead = internal.remoteBytesRead,
- totalBlocksFetched = internal.totalBlocksFetched,
- recordsRead = internal.recordsRead
- )
+ def convertShuffleReadMetrics(
+ internal: InternalShuffleReadMetrics): Option[ShuffleReadMetrics] = {
+ if (internal.isUpdated) {
+ Some(new ShuffleReadMetrics(
+ remoteBlocksFetched = internal.remoteBlocksFetched,
+ localBlocksFetched = internal.localBlocksFetched,
+ fetchWaitTime = internal.fetchWaitTime,
+ remoteBytesRead = internal.remoteBytesRead,
+ totalBlocksFetched = internal.totalBlocksFetched,
+ recordsRead = internal.recordsRead
+ ))
+ } else {
+ None
+ }
}
- def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
- new ShuffleWriteMetrics(
- bytesWritten = internal.bytesWritten,
- writeTime = internal.writeTime,
- recordsWritten = internal.recordsWritten
- )
+ def convertShuffleWriteMetrics(
+ internal: InternalShuffleWriteMetrics): Option[ShuffleWriteMetrics] = {
+ if ((internal.bytesWritten | internal.writeTime | internal.recordsWritten) == 0) {
+ None
+ } else {
+ Some(new ShuffleWriteMetrics(
+ bytesWritten = internal.bytesWritten,
+ writeTime = internal.writeTime,
+ recordsWritten = internal.recordsWritten
+ ))
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 4ec5b4bbb0..4dc2f36232 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -108,7 +108,7 @@ final class ShuffleBlockFetcherIterator(
/** Current number of requests in flight */
private[this] var reqsInFlight = 0
- private[this] val shuffleMetrics = context.taskMetrics().registerTempShuffleReadMetrics()
+ private[this] val shuffleMetrics = context.taskMetrics().createTempShuffleReadMetrics()
/**
* Whether the iterator is still active. If isZombie is true, the callback interface will no
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 3fd0efd3a1..676f445751 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -119,26 +119,19 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
- metrics.inputMetrics.foreach { inputMetrics =>
- executorToInputBytes(eid) =
- executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
- executorToInputRecords(eid) =
- executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead
- }
- metrics.outputMetrics.foreach { outputMetrics =>
- executorToOutputBytes(eid) =
- executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
- executorToOutputRecords(eid) =
- executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten
- }
- metrics.shuffleReadMetrics.foreach { shuffleRead =>
- executorToShuffleRead(eid) =
- executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
- }
- metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
- executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
- }
+ executorToInputBytes(eid) =
+ executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead
+ executorToInputRecords(eid) =
+ executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead
+ executorToOutputBytes(eid) =
+ executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten
+ executorToOutputRecords(eid) =
+ executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten
+
+ executorToShuffleRead(eid) =
+ executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead
+ executorToShuffleWrite(eid) =
+ executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten
executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 13f5f84d06..9e4771ce4a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -434,50 +434,50 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
val shuffleWriteDelta =
- (taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L))
+ taskMetrics.shuffleWriteMetrics.bytesWritten -
+ oldMetrics.map(_.shuffleWriteMetrics.bytesWritten).getOrElse(0L)
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta
val shuffleWriteRecordsDelta =
- (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L))
+ taskMetrics.shuffleWriteMetrics.recordsWritten -
+ oldMetrics.map(_.shuffleWriteMetrics.recordsWritten).getOrElse(0L)
stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
val shuffleReadDelta =
- (taskMetrics.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.totalBytesRead).getOrElse(0L))
+ taskMetrics.shuffleReadMetrics.totalBytesRead -
+ oldMetrics.map(_.shuffleReadMetrics.totalBytesRead).getOrElse(0L)
stageData.shuffleReadTotalBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
val shuffleReadRecordsDelta =
- (taskMetrics.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L)
- - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.recordsRead).getOrElse(0L))
+ taskMetrics.shuffleReadMetrics.recordsRead -
+ oldMetrics.map(_.shuffleReadMetrics.recordsRead).getOrElse(0L)
stageData.shuffleReadRecords += shuffleReadRecordsDelta
execSummary.shuffleReadRecords += shuffleReadRecordsDelta
val inputBytesDelta =
- (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
- - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
+ taskMetrics.inputMetrics.bytesRead -
+ oldMetrics.map(_.inputMetrics.bytesRead).getOrElse(0L)
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta
val inputRecordsDelta =
- (taskMetrics.inputMetrics.map(_.recordsRead).getOrElse(0L)
- - oldMetrics.flatMap(_.inputMetrics).map(_.recordsRead).getOrElse(0L))
+ taskMetrics.inputMetrics.recordsRead -
+ oldMetrics.map(_.inputMetrics.recordsRead).getOrElse(0L)
stageData.inputRecords += inputRecordsDelta
execSummary.inputRecords += inputRecordsDelta
val outputBytesDelta =
- (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
- - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
+ taskMetrics.outputMetrics.bytesWritten -
+ oldMetrics.map(_.outputMetrics.bytesWritten).getOrElse(0L)
stageData.outputBytes += outputBytesDelta
execSummary.outputBytes += outputBytesDelta
val outputRecordsDelta =
- (taskMetrics.outputMetrics.map(_.recordsWritten).getOrElse(0L)
- - oldMetrics.flatMap(_.outputMetrics).map(_.recordsWritten).getOrElse(0L))
+ taskMetrics.outputMetrics.recordsWritten -
+ oldMetrics.map(_.outputMetrics.recordsWritten).getOrElse(0L)
stageData.outputRecords += outputRecordsDelta
execSummary.outputRecords += outputRecordsDelta
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8a44bbd9fc..5d1928ac6b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -428,29 +428,29 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
val inputSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ taskUIData.metrics.get.inputMetrics.bytesRead.toDouble
}
val inputRecords = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ taskUIData.metrics.get.inputMetrics.recordsRead.toDouble
}
val inputQuantiles = <td>Input Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
val outputSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ taskUIData.metrics.get.outputMetrics.bytesWritten.toDouble
}
val outputRecords = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ taskUIData.metrics.get.outputMetrics.recordsWritten.toDouble
}
val outputQuantiles = <td>Output Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
+ taskUIData.metrics.get.shuffleReadMetrics.fetchWaitTime.toDouble
}
val shuffleReadBlockedQuantiles =
<td>
@@ -462,10 +462,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedTimeQuantiles(shuffleReadBlockedTimes)
val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
+ taskUIData.metrics.get.shuffleReadMetrics.totalBytesRead.toDouble
}
val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ taskUIData.metrics.get.shuffleReadMetrics.recordsRead.toDouble
}
val shuffleReadTotalQuantiles =
<td>
@@ -477,7 +477,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ taskUIData.metrics.get.shuffleReadMetrics.remoteBytesRead.toDouble
}
val shuffleReadRemoteQuantiles =
<td>
@@ -489,11 +489,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedSizeQuantiles(shuffleReadRemoteSizes)
val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ taskUIData.metrics.get.shuffleWriteMetrics.bytesWritten.toDouble
}
val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData =>
- taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ taskUIData.metrics.get.shuffleWriteMetrics.recordsWritten.toDouble
}
val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
@@ -603,11 +603,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val metricsOpt = taskUIData.metrics
val shuffleReadTime =
- metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L)
+ metricsOpt.map(_.shuffleReadMetrics.fetchWaitTime).getOrElse(0L)
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
val shuffleWriteTime =
- (metricsOpt.flatMap(_.shuffleWriteMetrics
- .map(_.writeTime)).getOrElse(0L) / 1e6).toLong
+ (metricsOpt.map(_.shuffleWriteMetrics.writeTime).getOrElse(0L) / 1e6).toLong
val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)
val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
@@ -890,21 +889,21 @@ private[ui] class TaskDataSource(
}
val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L)
- val maybeInput = metrics.flatMap(_.inputMetrics)
+ val maybeInput = metrics.map(_.inputMetrics)
val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
val inputReadable = maybeInput
- .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+ .map(m => s"${Utils.bytesToString(m.bytesRead)}")
.getOrElse("")
val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
- val maybeOutput = metrics.flatMap(_.outputMetrics)
+ val maybeOutput = metrics.map(_.outputMetrics)
val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L)
val outputReadable = maybeOutput
.map(m => s"${Utils.bytesToString(m.bytesWritten)}")
.getOrElse("")
val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
- val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+ val maybeShuffleRead = metrics.map(_.shuffleReadMetrics)
val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L)
val shuffleReadBlockedTimeReadable =
maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
@@ -918,14 +917,14 @@ private[ui] class TaskDataSource(
val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L)
val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
- val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+ val maybeShuffleWrite = metrics.map(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
val shuffleWriteReadable = maybeShuffleWrite
.map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
val shuffleWriteRecords = maybeShuffleWrite
.map(_.recordsWritten.toString).getOrElse("")
- val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime)
+ val maybeWriteTime = metrics.map(_.shuffleWriteMetrics.writeTime)
val writeTimeSortable = maybeWriteTime.getOrElse(0L)
val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
if (ms == 0) "" else UIUtils.formatDuration(ms)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 558767e36f..17b33c7c62 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -326,33 +326,35 @@ private[spark] object JsonProtocol {
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
- val shuffleReadMetrics: JValue =
- taskMetrics.shuffleReadMetrics.map { rm =>
- ("Remote Blocks Fetched" -> rm.remoteBlocksFetched) ~
- ("Local Blocks Fetched" -> rm.localBlocksFetched) ~
- ("Fetch Wait Time" -> rm.fetchWaitTime) ~
- ("Remote Bytes Read" -> rm.remoteBytesRead) ~
- ("Local Bytes Read" -> rm.localBytesRead) ~
- ("Total Records Read" -> rm.recordsRead)
- }.getOrElse(JNothing)
- val shuffleWriteMetrics: JValue =
- taskMetrics.shuffleWriteMetrics.map { wm =>
- ("Shuffle Bytes Written" -> wm.bytesWritten) ~
- ("Shuffle Write Time" -> wm.writeTime) ~
- ("Shuffle Records Written" -> wm.recordsWritten)
- }.getOrElse(JNothing)
- val inputMetrics: JValue =
- taskMetrics.inputMetrics.map { im =>
- ("Data Read Method" -> im.readMethod.toString) ~
- ("Bytes Read" -> im.bytesRead) ~
- ("Records Read" -> im.recordsRead)
- }.getOrElse(JNothing)
- val outputMetrics: JValue =
- taskMetrics.outputMetrics.map { om =>
- ("Data Write Method" -> om.writeMethod.toString) ~
- ("Bytes Written" -> om.bytesWritten) ~
- ("Records Written" -> om.recordsWritten)
- }.getOrElse(JNothing)
+ val shuffleReadMetrics: JValue = if (taskMetrics.shuffleReadMetrics.isUpdated) {
+ ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
+ ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
+ ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
+ ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
+ ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
+ ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
+ } else {
+ JNothing
+ }
+ val shuffleWriteMetrics: JValue = if (taskMetrics.shuffleWriteMetrics.isUpdated) {
+ ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
+ ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
+ ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
+ } else {
+ JNothing
+ }
+ val inputMetrics: JValue = if (taskMetrics.inputMetrics.isUpdated) {
+ ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
+ ("Records Read" -> taskMetrics.inputMetrics.recordsRead)
+ } else {
+ JNothing
+ }
+ val outputMetrics: JValue = if (taskMetrics.outputMetrics.isUpdated) {
+ ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
+ ("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
+ } else {
+ JNothing
+ }
val updatedBlocks =
JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
@@ -781,7 +783,7 @@ private[spark] object JsonProtocol {
// Shuffle read metrics
Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
- val readMetrics = metrics.registerTempShuffleReadMetrics()
+ val readMetrics = metrics.createTempShuffleReadMetrics()
readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
@@ -794,7 +796,7 @@ private[spark] object JsonProtocol {
// Shuffle write metrics
// TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
- val writeMetrics = metrics.registerShuffleWriteMetrics()
+ val writeMetrics = metrics.shuffleWriteMetrics
writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
.extractOpt[Long].getOrElse(0L))
@@ -803,16 +805,14 @@ private[spark] object JsonProtocol {
// Output metrics
Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
- val writeMethod = DataWriteMethod.withName((outJson \ "Data Write Method").extract[String])
- val outputMetrics = metrics.registerOutputMetrics(writeMethod)
+ val outputMetrics = metrics.outputMetrics
outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
}
// Input metrics
Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
- val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
- val inputMetrics = metrics.registerInputMetrics(readMethod)
+ val inputMetrics = metrics.inputMetrics
inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 561ba22df5..916053f42d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -645,7 +645,7 @@ private[spark] class ExternalSorter[K, V, C](
blockId: BlockId,
outputFile: File): Array[Long] = {
- val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics()
+ val writeMetrics = context.taskMetrics().shuffleWriteMetrics
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 30750b1bf1..fbaaa1cf49 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -249,8 +249,8 @@ public class UnsafeShuffleWriterSuite {
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
- assertEquals(0, taskMetrics.shuffleWriteMetrics().get().recordsWritten());
- assertEquals(0, taskMetrics.shuffleWriteMetrics().get().bytesWritten());
+ assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
+ assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}
@@ -279,7 +279,7 @@ public class UnsafeShuffleWriterSuite {
HashMultiset.create(dataToWrite),
HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
@@ -321,7 +321,7 @@ public class UnsafeShuffleWriterSuite {
assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile()));
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
@@ -383,7 +383,7 @@ public class UnsafeShuffleWriterSuite {
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
@@ -404,7 +404,7 @@ public class UnsafeShuffleWriterSuite {
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
- ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get();
+ ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics();
assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten());
assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L));
assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length()));
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 474550608b..db087a9c3c 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -59,11 +59,9 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
assert(getParam(shuffleWrite.RECORDS_WRITTEN) === LongAccumulatorParam)
assert(getParam(shuffleWrite.WRITE_TIME) === LongAccumulatorParam)
// input
- assert(getParam(input.READ_METHOD) === StringAccumulatorParam)
assert(getParam(input.RECORDS_READ) === LongAccumulatorParam)
assert(getParam(input.BYTES_READ) === LongAccumulatorParam)
// output
- assert(getParam(output.WRITE_METHOD) === StringAccumulatorParam)
assert(getParam(output.RECORDS_WRITTEN) === LongAccumulatorParam)
assert(getParam(output.BYTES_WRITTEN) === LongAccumulatorParam)
// default to Long
@@ -77,18 +75,15 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
val executorRunTime = create(EXECUTOR_RUN_TIME)
val updatedBlockStatuses = create(UPDATED_BLOCK_STATUSES)
val shuffleRemoteBlocksRead = create(shuffleRead.REMOTE_BLOCKS_FETCHED)
- val inputReadMethod = create(input.READ_METHOD)
assert(executorRunTime.name === Some(EXECUTOR_RUN_TIME))
assert(updatedBlockStatuses.name === Some(UPDATED_BLOCK_STATUSES))
assert(shuffleRemoteBlocksRead.name === Some(shuffleRead.REMOTE_BLOCKS_FETCHED))
- assert(inputReadMethod.name === Some(input.READ_METHOD))
assert(executorRunTime.value.isInstanceOf[Long])
assert(updatedBlockStatuses.value.isInstanceOf[Seq[_]])
// We cannot assert the type of the value directly since the type parameter is erased.
// Instead, try casting a `Seq` of expected type and see if it fails in run time.
updatedBlockStatuses.setValueAny(Seq.empty[(BlockId, BlockStatus)])
assert(shuffleRemoteBlocksRead.value.isInstanceOf[Int])
- assert(inputReadMethod.value.isInstanceOf[String])
// default to Long
val anything = create(METRICS_PREFIX + "anything")
assert(anything.value.isInstanceOf[Long])
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index cd7d2e1570..079109d137 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -450,14 +450,10 @@ object ShuffleSuite {
@volatile var bytesRead: Long = 0
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskEnd.taskMetrics.shuffleWriteMetrics.foreach { m =>
- recordsWritten += m.recordsWritten
- bytesWritten += m.bytesWritten
- }
- taskEnd.taskMetrics.shuffleReadMetrics.foreach { m =>
- recordsRead += m.recordsRead
- bytesRead += m.totalBytesRead
- }
+ recordsWritten += taskEnd.taskMetrics.shuffleWriteMetrics.recordsWritten
+ bytesWritten += taskEnd.taskMetrics.shuffleWriteMetrics.bytesWritten
+ recordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead
+ bytesRead += taskEnd.taskMetrics.shuffleReadMetrics.totalBytesRead
}
}
sc.addSparkListener(listener)
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index d91f50f18f..a263fce8ab 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -30,24 +30,6 @@ class TaskMetricsSuite extends SparkFunSuite {
import StorageLevel._
import TaskMetricsSuite._
- test("create") {
- val internalAccums = InternalAccumulator.createAll()
- val tm1 = new TaskMetrics
- val tm2 = new TaskMetrics(internalAccums)
- assert(tm1.accumulatorUpdates().size === internalAccums.size)
- assert(tm1.shuffleReadMetrics.isEmpty)
- assert(tm1.shuffleWriteMetrics.isEmpty)
- assert(tm1.inputMetrics.isEmpty)
- assert(tm1.outputMetrics.isEmpty)
- assert(tm2.accumulatorUpdates().size === internalAccums.size)
- assert(tm2.shuffleReadMetrics.isEmpty)
- assert(tm2.shuffleWriteMetrics.isEmpty)
- assert(tm2.inputMetrics.isEmpty)
- assert(tm2.outputMetrics.isEmpty)
- // TaskMetrics constructor expects minimal set of initial accumulators
- intercept[IllegalArgumentException] { new TaskMetrics(Seq.empty[Accumulator[_]]) }
- }
-
test("create with unnamed accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
@@ -110,11 +92,9 @@ class TaskMetricsSuite extends SparkFunSuite {
.map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
accums(BYTES_READ).setValueAny(1L)
accums(RECORDS_READ).setValueAny(2L)
- accums(READ_METHOD).setValueAny(DataReadMethod.Hadoop.toString)
val im = new InputMetrics(accums)
assert(im.bytesRead === 1L)
assert(im.recordsRead === 2L)
- assert(im.readMethod === DataReadMethod.Hadoop)
}
test("create output metrics") {
@@ -123,11 +103,9 @@ class TaskMetricsSuite extends SparkFunSuite {
.map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
accums(BYTES_WRITTEN).setValueAny(1L)
accums(RECORDS_WRITTEN).setValueAny(2L)
- accums(WRITE_METHOD).setValueAny(DataWriteMethod.Hadoop.toString)
val om = new OutputMetrics(accums)
assert(om.bytesWritten === 1L)
assert(om.recordsWritten === 2L)
- assert(om.writeMethod === DataWriteMethod.Hadoop)
}
test("mutating values") {
@@ -183,14 +161,12 @@ class TaskMetricsSuite extends SparkFunSuite {
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
+ assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics), accums, name, value)
}
// create shuffle read metrics
- assert(tm.shuffleReadMetrics.isEmpty)
- tm.registerTempShuffleReadMetrics()
+ tm.createTempShuffleReadMetrics()
tm.mergeShuffleReadMetrics()
- assert(tm.shuffleReadMetrics.isDefined)
- val sr = tm.shuffleReadMetrics.get
+ val sr = tm.shuffleReadMetrics
// initial values
assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
@@ -237,13 +213,10 @@ class TaskMetricsSuite extends SparkFunSuite {
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
+ assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics), accums, name, value)
}
// create shuffle write metrics
- assert(tm.shuffleWriteMetrics.isEmpty)
- tm.registerShuffleWriteMetrics()
- assert(tm.shuffleWriteMetrics.isDefined)
- val sw = tm.shuffleWriteMetrics.get
+ val sw = tm.shuffleWriteMetrics
// initial values
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
@@ -270,28 +243,22 @@ class TaskMetricsSuite extends SparkFunSuite {
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
+ assertValueEquals(tm, tm => tmValue(tm.inputMetrics), accums, name, value,
(x: Any, y: Any) => assert(x.toString === y.toString))
}
// create input metrics
- assert(tm.inputMetrics.isEmpty)
- tm.registerInputMetrics(DataReadMethod.Memory)
- assert(tm.inputMetrics.isDefined)
- val in = tm.inputMetrics.get
+ val in = tm.inputMetrics
// initial values
assertValEquals(_.bytesRead, BYTES_READ, 0L)
assertValEquals(_.recordsRead, RECORDS_READ, 0L)
- assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory)
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
in.incRecordsRead(1L)
in.incRecordsRead(2L)
- in.setReadMethod(DataReadMethod.Disk)
// assert new values exist
assertValEquals(_.bytesRead, BYTES_READ, 2L)
assertValEquals(_.recordsRead, RECORDS_READ, 3L)
- assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk)
}
test("mutating output metrics values") {
@@ -299,85 +266,42 @@ class TaskMetricsSuite extends SparkFunSuite {
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
+ assertValueEquals(tm, tm => tmValue(tm.outputMetrics), accums, name, value,
(x: Any, y: Any) => assert(x.toString === y.toString))
}
// create input metrics
- assert(tm.outputMetrics.isEmpty)
- tm.registerOutputMetrics(DataWriteMethod.Hadoop)
- assert(tm.outputMetrics.isDefined)
- val out = tm.outputMetrics.get
+ val out = tm.outputMetrics
// initial values
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
- assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
// set values
out.setBytesWritten(1L)
out.setBytesWritten(2L)
out.setRecordsWritten(3L)
out.setRecordsWritten(4L)
- out.setWriteMethod(DataWriteMethod.Hadoop)
// assert new values exist
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
- // Note: this doesn't actually test anything, but there's only one DataWriteMethod
- // so we can't set it to anything else
- assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
}
test("merging multiple shuffle read metrics") {
val tm = new TaskMetrics
- assert(tm.shuffleReadMetrics.isEmpty)
- val sr1 = tm.registerTempShuffleReadMetrics()
- val sr2 = tm.registerTempShuffleReadMetrics()
- val sr3 = tm.registerTempShuffleReadMetrics()
- assert(tm.shuffleReadMetrics.isEmpty)
+ val sr1 = tm.createTempShuffleReadMetrics()
+ val sr2 = tm.createTempShuffleReadMetrics()
+ val sr3 = tm.createTempShuffleReadMetrics()
sr1.setRecordsRead(10L)
sr2.setRecordsRead(10L)
sr1.setFetchWaitTime(1L)
sr2.setFetchWaitTime(2L)
sr3.setFetchWaitTime(3L)
tm.mergeShuffleReadMetrics()
- assert(tm.shuffleReadMetrics.isDefined)
- val sr = tm.shuffleReadMetrics.get
- assert(sr.remoteBlocksFetched === 0L)
- assert(sr.recordsRead === 20L)
- assert(sr.fetchWaitTime === 6L)
+ assert(tm.shuffleReadMetrics.remoteBlocksFetched === 0L)
+ assert(tm.shuffleReadMetrics.recordsRead === 20L)
+ assert(tm.shuffleReadMetrics.fetchWaitTime === 6L)
// SPARK-5701: calling merge without any shuffle deps does nothing
val tm2 = new TaskMetrics
tm2.mergeShuffleReadMetrics()
- assert(tm2.shuffleReadMetrics.isEmpty)
- }
-
- test("register multiple shuffle write metrics") {
- val tm = new TaskMetrics
- val sw1 = tm.registerShuffleWriteMetrics()
- val sw2 = tm.registerShuffleWriteMetrics()
- assert(sw1 === sw2)
- assert(tm.shuffleWriteMetrics === Some(sw1))
- }
-
- test("register multiple input metrics") {
- val tm = new TaskMetrics
- val im1 = tm.registerInputMetrics(DataReadMethod.Memory)
- val im2 = tm.registerInputMetrics(DataReadMethod.Memory)
- // input metrics with a different read method than the one already registered are ignored
- val im3 = tm.registerInputMetrics(DataReadMethod.Hadoop)
- assert(im1 === im2)
- assert(im1 !== im3)
- assert(tm.inputMetrics === Some(im1))
- im2.setBytesRead(50L)
- im3.setBytesRead(100L)
- assert(tm.inputMetrics.get.bytesRead === 50L)
- }
-
- test("register multiple output metrics") {
- val tm = new TaskMetrics
- val om1 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
- val om2 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
- assert(om1 === om2)
- assert(tm.outputMetrics === Some(om1))
}
test("additional accumulables") {
@@ -424,10 +348,6 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(srAccum.isDefined)
srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
val tm = new TaskMetrics(accums)
- assert(tm.shuffleReadMetrics.isDefined)
- assert(tm.shuffleWriteMetrics.isEmpty)
- assert(tm.inputMetrics.isEmpty)
- assert(tm.outputMetrics.isEmpty)
}
test("existing values in shuffle write accums") {
@@ -437,10 +357,6 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(swAccum.isDefined)
swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
val tm = new TaskMetrics(accums)
- assert(tm.shuffleReadMetrics.isEmpty)
- assert(tm.shuffleWriteMetrics.isDefined)
- assert(tm.inputMetrics.isEmpty)
- assert(tm.outputMetrics.isEmpty)
}
test("existing values in input accums") {
@@ -450,10 +366,6 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(inAccum.isDefined)
inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
val tm = new TaskMetrics(accums)
- assert(tm.shuffleReadMetrics.isEmpty)
- assert(tm.shuffleWriteMetrics.isEmpty)
- assert(tm.inputMetrics.isDefined)
- assert(tm.outputMetrics.isEmpty)
}
test("existing values in output accums") {
@@ -463,10 +375,6 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(outAccum.isDefined)
outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
val tm4 = new TaskMetrics(accums)
- assert(tm4.shuffleReadMetrics.isEmpty)
- assert(tm4.shuffleWriteMetrics.isEmpty)
- assert(tm4.inputMetrics.isEmpty)
- assert(tm4.outputMetrics.isDefined)
}
test("from accumulator updates") {
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 056e5463a0..f8054f5fd7 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -25,16 +25,10 @@ import org.apache.commons.lang3.RandomUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit,
- JobConf, LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader,
- Reporter, TextInputFormat => OldTextInputFormat}
-import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat,
- CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit}
-import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader,
- TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat,
- CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit,
- FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
+import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf, LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter, TextInputFormat => OldTextInputFormat}
+import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat, CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit}
+import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat, CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit, FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.BeforeAndAfter
@@ -103,40 +97,6 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
assert(bytesRead2 == bytesRead)
}
- /**
- * This checks the situation where we have interleaved reads from
- * different sources. Currently, we only accumulate from the first
- * read method we find in the task. This test uses cartesian to create
- * the interleaved reads.
- *
- * Once https://issues.apache.org/jira/browse/SPARK-5225 is fixed
- * this test should break.
- */
- test("input metrics with mixed read method") {
- // prime the cache manager
- val numPartitions = 2
- val rdd = sc.parallelize(1 to 100, numPartitions).cache()
- rdd.collect()
-
- val rdd2 = sc.textFile(tmpFilePath, numPartitions)
-
- val bytesRead = runAndReturnBytesRead {
- rdd.count()
- }
- val bytesRead2 = runAndReturnBytesRead {
- rdd2.count()
- }
-
- val cartRead = runAndReturnBytesRead {
- rdd.cartesian(rdd2).count()
- }
-
- assert(cartRead != 0)
- assert(bytesRead != 0)
- // We read from the first rdd of the cartesian once per partition.
- assert(cartRead == bytesRead * numPartitions)
- }
-
test("input metrics for new Hadoop API with coalesce") {
val bytesRead = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
@@ -209,10 +169,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
- metrics.inputMetrics.foreach(inputRead += _.recordsRead)
- metrics.outputMetrics.foreach(outputWritten += _.recordsWritten)
- metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead)
- metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten)
+ inputRead += metrics.inputMetrics.recordsRead
+ outputWritten += metrics.outputMetrics.recordsWritten
+ shuffleRead += metrics.shuffleReadMetrics.recordsRead
+ shuffleWritten += metrics.shuffleWriteMetrics.recordsWritten
}
})
@@ -272,19 +232,18 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
private def runAndReturnBytesRead(job: => Unit): Long = {
- runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead))
+ runAndReturnMetrics(job, _.taskMetrics.inputMetrics.bytesRead)
}
private def runAndReturnRecordsRead(job: => Unit): Long = {
- runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead))
+ runAndReturnMetrics(job, _.taskMetrics.inputMetrics.recordsRead)
}
private def runAndReturnRecordsWritten(job: => Unit): Long = {
- runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten))
+ runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten)
}
- private def runAndReturnMetrics(job: => Unit,
- collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
+ private def runAndReturnMetrics(job: => Unit, collector: (SparkListenerTaskEnd) => Long): Long = {
val taskMetrics = new ArrayBuffer[Long]()
// Avoid receiving earlier taskEnd events
@@ -292,7 +251,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- collector(taskEnd).foreach(taskMetrics += _)
+ taskMetrics += collector(taskEnd)
}
})
@@ -337,7 +296,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
val taskBytesWritten = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesWritten += taskEnd.taskMetrics.outputMetrics.get.bytesWritten
+ taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
}
})
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index b854d742b5..5ba67afc0c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -266,18 +266,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0L)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
- taskMetrics.inputMetrics should not be ('defined)
- taskMetrics.outputMetrics should not be ('defined)
- taskMetrics.shuffleWriteMetrics should be ('defined)
- taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L)
+ assert(taskMetrics.shuffleWriteMetrics.bytesWritten > 0L)
}
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
- taskMetrics.shuffleReadMetrics should be ('defined)
- val sm = taskMetrics.shuffleReadMetrics.get
- sm.totalBlocksFetched should be (2*numSlices)
- sm.localBlocksFetched should be (2*numSlices)
- sm.remoteBlocksFetched should be (0)
- sm.remoteBytesRead should be (0L)
+ assert(taskMetrics.shuffleReadMetrics.totalBlocksFetched == 2 * numSlices)
+ assert(taskMetrics.shuffleReadMetrics.localBlocksFetched == 2 * numSlices)
+ assert(taskMetrics.shuffleReadMetrics.remoteBlocksFetched == 0)
+ assert(taskMetrics.shuffleReadMetrics.remoteBytesRead == 0L)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 16418f855b..5132384a5e 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -144,7 +144,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(outputFile.exists())
assert(outputFile.length() === 0)
assert(temporaryFilesCreated.isEmpty)
- val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
+ val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics
assert(shuffleWriteMetrics.bytesWritten === 0)
assert(shuffleWriteMetrics.recordsWritten === 0)
assert(taskMetrics.diskBytesSpilled === 0)
@@ -168,7 +168,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(writer.getPartitionLengths.sum === outputFile.length())
assert(writer.getPartitionLengths.count(_ == 0L) === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
- val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
+ val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics
assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
assert(shuffleWriteMetrics.recordsWritten === records.length)
assert(taskMetrics.diskBytesSpilled === 0)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 7d4c0863bc..85c877e3dd 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -184,7 +184,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
- val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
+ val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
@@ -272,10 +272,10 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val accums = InternalAccumulator.createAll()
accums.foreach(Accumulators.register)
val taskMetrics = new TaskMetrics(accums)
- val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
- val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
- val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
- val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
+ val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
+ val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
+ val inputMetrics = taskMetrics.inputMetrics
+ val outputMetrics = taskMetrics.outputMetrics
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
@@ -322,12 +322,13 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 207)
assert(stage0Data.outputBytes == 116)
assert(stage1Data.outputBytes == 208)
- assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
- .totalBlocksFetched == 2)
- assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get
- .totalBlocksFetched == 102)
- assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get
- .totalBlocksFetched == 202)
+
+ assert(
+ stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 2)
+ assert(
+ stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 102)
+ assert(
+ stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 202)
// task that was included in a heartbeat
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
@@ -355,9 +356,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 614)
assert(stage0Data.outputBytes == 416)
assert(stage1Data.outputBytes == 616)
- assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
- .totalBlocksFetched == 302)
- assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get
- .totalBlocksFetched == 402)
+ assert(
+ stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 302)
+ assert(
+ stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402)
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index de6f408fa8..612c7c1954 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -197,49 +197,41 @@ class JsonProtocolSuite extends SparkFunSuite {
test("InputMetrics backward compatibility") {
// InputMetrics were added after 1.0.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false)
- assert(metrics.inputMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
- assert(newMetrics.inputMetrics.isEmpty)
}
test("Input/Output records backwards compatibility") {
// records read were added after 1.2
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
hasHadoopInput = true, hasOutput = true, hasRecords = false)
- assert(metrics.inputMetrics.nonEmpty)
- assert(metrics.outputMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Records Read" }
.removeField { case (field, _) => field == "Records Written" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
- assert(newMetrics.inputMetrics.get.recordsRead == 0)
- assert(newMetrics.outputMetrics.get.recordsWritten == 0)
+ assert(newMetrics.inputMetrics.recordsRead == 0)
+ assert(newMetrics.outputMetrics.recordsWritten == 0)
}
test("Shuffle Read/Write records backwards compatibility") {
// records read were added after 1.2
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
hasHadoopInput = false, hasOutput = false, hasRecords = false)
- assert(metrics.shuffleReadMetrics.nonEmpty)
- assert(metrics.shuffleWriteMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" }
.removeField { case (field, _) => field == "Shuffle Records Written" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
- assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
- assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0)
+ assert(newMetrics.shuffleReadMetrics.recordsRead == 0)
+ assert(newMetrics.shuffleWriteMetrics.recordsWritten == 0)
}
test("OutputMetrics backward compatibility") {
// OutputMetrics were added after 1.1
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
- assert(metrics.outputMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
- assert(newMetrics.outputMetrics.isEmpty)
}
test("BlockManager events backward compatibility") {
@@ -279,11 +271,10 @@ class JsonProtocolSuite extends SparkFunSuite {
// Metrics about local shuffle bytes read were added in 1.3.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
hasHadoopInput = false, hasOutput = false, hasRecords = false)
- assert(metrics.shuffleReadMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
- assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
+ assert(newMetrics.shuffleReadMetrics.localBytesRead == 0)
}
test("SparkListenerApplicationStart backwards compatibility") {
@@ -423,7 +414,6 @@ class JsonProtocolSuite extends SparkFunSuite {
})
testAccumValue(Some(RESULT_SIZE), 3L, JInt(3))
testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2))
- testAccumValue(Some(input.READ_METHOD), "aka", JString("aka"))
testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson)
// For anything else, we just cast the value to a string
testAccumValue(Some("anything"), blocks, JString(blocks.toString))
@@ -619,12 +609,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
assert(metrics1.resultSerializationTime === metrics2.resultSerializationTime)
assert(metrics1.memoryBytesSpilled === metrics2.memoryBytesSpilled)
assert(metrics1.diskBytesSpilled === metrics2.diskBytesSpilled)
- assertOptionEquals(
- metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
- assertOptionEquals(
- metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
- assertOptionEquals(
- metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
+ assertEquals(metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics)
+ assertEquals(metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics)
+ assertEquals(metrics1.inputMetrics, metrics2.inputMetrics)
assertBlocksEquals(metrics1.updatedBlockStatuses, metrics2.updatedBlockStatuses)
}
@@ -641,7 +628,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
}
private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
- assert(metrics1.readMethod === metrics2.readMethod)
assert(metrics1.bytesRead === metrics2.bytesRead)
}
@@ -706,12 +692,13 @@ private[spark] object JsonProtocolSuite extends Assertions {
}
private def assertJsonStringEquals(expected: String, actual: String, metadata: String) {
- val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
- if (formatJsonString(expected) != formatJsonString(actual)) {
+ val expectedJson = pretty(parse(expected))
+ val actualJson = pretty(parse(actual))
+ if (expectedJson != actualJson) {
// scalastyle:off
// This prints something useful if the JSON strings don't match
- println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n")
- println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n")
+ println("=== EXPECTED ===\n" + expectedJson + "\n")
+ println("=== ACTUAL ===\n" + actualJson + "\n")
// scalastyle:on
throw new TestFailedException(s"$metadata JSON did not equal", 1)
}
@@ -740,22 +727,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
* Use different names for methods we pass in to assertSeqEquals or assertOptionEquals
*/
- private def assertShuffleReadEquals(r1: ShuffleReadMetrics, r2: ShuffleReadMetrics) {
- assertEquals(r1, r2)
- }
-
- private def assertShuffleWriteEquals(w1: ShuffleWriteMetrics, w2: ShuffleWriteMetrics) {
- assertEquals(w1, w2)
- }
-
- private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
- assertEquals(i1, i2)
- }
-
- private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
- assertEquals(t1, t2)
- }
-
private def assertBlocksEquals(
blocks1: Seq[(BlockId, BlockStatus)],
blocks2: Seq[(BlockId, BlockStatus)]) = {
@@ -851,11 +822,11 @@ private[spark] object JsonProtocolSuite extends Assertions {
t.incMemoryBytesSpilled(a + c)
if (hasHadoopInput) {
- val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = t.inputMetrics
inputMetrics.setBytesRead(d + e + f)
inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
} else {
- val sr = t.registerTempShuffleReadMetrics()
+ val sr = t.createTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
@@ -865,11 +836,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
t.mergeShuffleReadMetrics()
}
if (hasOutput) {
- val outputMetrics = t.registerOutputMetrics(DataWriteMethod.Hadoop)
- outputMetrics.setBytesWritten(a + b + c)
- outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
+ t.outputMetrics.setBytesWritten(a + b + c)
+ t.outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
} else {
- val sw = t.registerShuffleWriteMetrics()
+ val sw = t.shuffleWriteMetrics
sw.incBytesWritten(a + b + c)
sw.incWriteTime(b + c + d)
sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
@@ -896,7 +866,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Stage Name": "greetings",
| "Number of Tasks": 200,
| "RDD Info": [],
- | "ParentIDs" : [100, 200, 300],
+ | "Parent IDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
@@ -924,7 +894,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Ukraine": "Kiev"
| }
|}
- """
+ """.stripMargin
private val stageCompletedJsonString =
"""
@@ -953,7 +923,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Disk Size": 501
| }
| ],
- | "ParentIDs" : [100, 200, 300],
+ | "Parent IDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
@@ -975,7 +945,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| ]
| }
|}
- """
+ """.stripMargin
private val taskStartJsonString =
"""
@@ -1223,7 +1193,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Shuffle Records Written": 12
| },
| "Input Metrics": {
- | "Data Read Method": "Hadoop",
| "Bytes Read": 2100,
| "Records Read": 21
| },
@@ -1244,7 +1213,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| ]
| }
|}
- """
+ """.stripMargin
private val taskEndWithOutputJsonString =
"""
@@ -1304,12 +1273,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Memory Bytes Spilled": 800,
| "Disk Bytes Spilled": 0,
| "Input Metrics": {
- | "Data Read Method": "Hadoop",
| "Bytes Read": 2100,
| "Records Read": 21
| },
| "Output Metrics": {
- | "Data Write Method": "Hadoop",
| "Bytes Written": 1200,
| "Records Written": 12
| },
@@ -1330,7 +1297,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| ]
| }
|}
- """
+ """.stripMargin
private val jobStartJsonString =
"""
@@ -1422,7 +1389,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Disk Size": 1001
| }
| ],
- | "ParentIDs" : [100, 200, 300],
+ | "Parent IDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
@@ -1498,7 +1465,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Disk Size": 1502
| }
| ],
- | "ParentIDs" : [100, 200, 300],
+ | "Parent IDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
@@ -1590,7 +1557,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Disk Size": 2003
| }
| ],
- | "ParentIDs" : [100, 200, 300],
+ | "Parent IDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
@@ -1625,7 +1592,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Ukraine": "Kiev"
| }
|}
- """
+ """.stripMargin
private val jobEndJsonString =
"""
@@ -1637,7 +1604,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Result": "JobSucceeded"
| }
|}
- """
+ """.stripMargin
private val environmentUpdateJsonString =
"""
@@ -1658,7 +1625,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Super library": "/tmp/super_library"
| }
|}
- """
+ """.stripMargin
private val blockManagerAddedJsonString =
"""
@@ -1672,7 +1639,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Maximum Memory": 500,
| "Timestamp": 1
|}
- """
+ """.stripMargin
private val blockManagerRemovedJsonString =
"""
@@ -1685,7 +1652,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| },
| "Timestamp": 2
|}
- """
+ """.stripMargin
private val unpersistRDDJsonString =
"""
@@ -1693,7 +1660,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Event": "SparkListenerUnpersistRDD",
| "RDD ID": 12345
|}
- """
+ """.stripMargin
private val applicationStartJsonString =
"""
@@ -1705,7 +1672,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "User": "Garfield",
| "App Attempt ID": "appAttempt"
|}
- """
+ """.stripMargin
private val applicationStartJsonWithLogUrlsString =
"""
@@ -1721,7 +1688,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "stdout" : "mystdout"
| }
|}
- """
+ """.stripMargin
private val applicationEndJsonString =
"""
@@ -1729,7 +1696,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Event": "SparkListenerApplicationEnd",
| "Timestamp": 42
|}
- """
+ """.stripMargin
private val executorAddedJsonString =
s"""
@@ -1746,7 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| }
| }
|}
- """
+ """.stripMargin
private val executorRemovedJsonString =
s"""
@@ -1756,7 +1723,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Executor ID": "exec2",
| "Removed Reason": "test reason"
|}
- """
+ """.stripMargin
private val executorMetricsUpdateJsonString =
s"""
@@ -1830,16 +1797,16 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Name": "$UPDATED_BLOCK_STATUSES",
| "Update": [
| {
- | "BlockID": "rdd_0_0",
+ | "Block ID": "rdd_0_0",
| "Status": {
- | "StorageLevel": {
- | "UseDisk": true,
- | "UseMemory": true,
+ | "Storage Level": {
+ | "Use Disk": true,
+ | "Use Memory": true,
| "Deserialized": false,
| "Replication": 2
| },
- | "MemorySize": 0,
- | "DiskSize": 0
+ | "Memory Size": 0,
+ | "Disk Size": 0
| }
| }
| ],
@@ -1911,48 +1878,34 @@ private[spark] object JsonProtocolSuite extends Assertions {
| },
| {
| "ID": 18,
- | "Name": "${input.READ_METHOD}",
- | "Update": "Hadoop",
- | "Internal": true,
- | "Count Failed Values": true
- | },
- | {
- | "ID": 19,
| "Name": "${input.BYTES_READ}",
| "Update": 2100,
| "Internal": true,
| "Count Failed Values": true
| },
| {
- | "ID": 20,
+ | "ID": 19,
| "Name": "${input.RECORDS_READ}",
| "Update": 21,
| "Internal": true,
| "Count Failed Values": true
| },
| {
- | "ID": 21,
- | "Name": "${output.WRITE_METHOD}",
- | "Update": "Hadoop",
- | "Internal": true,
- | "Count Failed Values": true
- | },
- | {
- | "ID": 22,
+ | "ID": 20,
| "Name": "${output.BYTES_WRITTEN}",
| "Update": 1200,
| "Internal": true,
| "Count Failed Values": true
| },
| {
- | "ID": 23,
+ | "ID": 21,
| "Name": "${output.RECORDS_WRITTEN}",
| "Update": 12,
| "Internal": true,
| "Count Failed Values": true
| },
| {
- | "ID": 24,
+ | "ID": 22,
| "Name": "$TEST_ACCUM",
| "Update": 0,
| "Internal": true,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 71f337ce1f..7730823f94 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -630,7 +630,10 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"),
// [SPARK-14617] Remove deprecated APIs in TaskMetrics
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$")
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$"),
+ // [SPARK-14628] Simplify task metrics by always tracking read/write metrics
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.readMethod"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.writeMethod")
)
case v if v.startsWith("1.6") =>
Seq(