diff options
34 files changed, 330 insertions, 610 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 7a60c3eb35..0e9defe5b4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 0c5fb883a8..daa63d47e6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -118,7 +118,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { this.shuffleId = dep.shuffleId(); this.serializer = dep.serializer().newInstance(); this.partitioner = dep.partitioner(); - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ef79b49083..3e32dd9d63 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -129,7 +129,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.fileBufferSizeBytes = 32 * 1024; - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); if (existingInMemorySorter == null) { this.inMemSorter = new UnsafeInMemorySorter( diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 0dd4ec656f..714c8737a9 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -68,14 +68,12 @@ private[spark] object InternalAccumulator { // Names of output metrics object output { - val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod" val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten" val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten" } // Names of input metrics object input { - val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod" val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead" val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead" } @@ -110,8 +108,6 @@ private[spark] object InternalAccumulator { case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam - case input.READ_METHOD => StringAccumulatorParam - case output.WRITE_METHOD => StringAccumulatorParam case _ => LongAccumulatorParam } } @@ -165,7 +161,6 @@ private[spark] object InternalAccumulator { */ def createInputAccums(): Seq[Accumulator[_]] = { Seq[String]( - input.READ_METHOD, input.BYTES_READ, input.RECORDS_READ).map(create) } @@ -175,7 +170,6 @@ private[spark] object InternalAccumulator { */ def createOutputAccums(): Seq[Accumulator[_]] = { Seq[String]( - output.WRITE_METHOD, output.BYTES_WRITTEN, output.RECORDS_WRITTEN).map(create) } diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 83e11c5e23..2181bde9f0 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -39,31 +39,13 @@ object DataReadMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about reading data from external systems. */ @DeveloperApi -class InputMetrics private ( - _bytesRead: Accumulator[Long], - _recordsRead: Accumulator[Long], - _readMethod: Accumulator[String]) +class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long]) extends Serializable { private[executor] def this(accumMap: Map[String, Accumulator[_]]) { this( TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ), - TaskMetrics.getAccum[String](accumMap, InternalAccumulator.input.READ_METHOD)) - } - - /** - * Create a new [[InputMetrics]] that is not associated with any particular task. - * - * This mainly exists because of SPARK-5225, where we are forced to use a dummy [[InputMetrics]] - * because we want to ignore metrics from a second read method. In the future, we should revisit - * whether this is needed. - * - * A better alternative is [[TaskMetrics.registerInputMetrics]]. - */ - private[executor] def this() { - this(InternalAccumulator.createInputAccums() - .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]) + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ)) } /** @@ -77,13 +59,12 @@ class InputMetrics private ( def recordsRead: Long = _recordsRead.localValue /** - * The source from which this task reads its input. + * Returns true if this metrics has been updated before. */ - def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue) + def isUpdated: Boolean = (bytesRead | recordsRead) != 0 private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) - private[spark] def setReadMethod(v: DataReadMethod.Value): Unit = _readMethod.setValue(v.toString) } diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index 93f953846f..7f20f6bf0d 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -38,17 +38,13 @@ object DataWriteMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about writing data to external systems. */ @DeveloperApi -class OutputMetrics private ( - _bytesWritten: Accumulator[Long], - _recordsWritten: Accumulator[Long], - _writeMethod: Accumulator[String]) +class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long]) extends Serializable { private[executor] def this(accumMap: Map[String, Accumulator[_]]) { this( TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN), - TaskMetrics.getAccum[String](accumMap, InternalAccumulator.output.WRITE_METHOD)) + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN)) } /** @@ -62,13 +58,10 @@ class OutputMetrics private ( def recordsWritten: Long = _recordsWritten.localValue /** - * The source to which this task writes its output. + * Returns true if this metrics has been updated before. */ - def writeMethod: DataWriteMethod.Value = DataWriteMethod.withName(_writeMethod.localValue) + def isUpdated: Boolean = (bytesWritten | recordsWritten) != 0 private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v) private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v) - private[spark] def setWriteMethod(v: DataWriteMethod.Value): Unit = - _writeMethod.setValue(v.toString) - } diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 71a24770b5..9c78995ff3 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -53,7 +53,7 @@ class ShuffleReadMetrics private ( * many places only to merge their values together later. In the future, we should revisit * whether this is needed. * - * A better alternative is [[TaskMetrics.registerTempShuffleReadMetrics]] followed by + * A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by * [[TaskMetrics.mergeShuffleReadMetrics]]. */ private[spark] def this() { @@ -102,6 +102,11 @@ class ShuffleReadMetrics private ( */ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched + /** + * Returns true if this metrics has been updated before. + */ + def isUpdated: Boolean = (totalBytesRead | totalBlocksFetched | recordsRead | fetchWaitTime) != 0 + private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v) private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v) private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v) diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index c7aaabb561..cf570e1f9d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -47,7 +47,7 @@ class ShuffleWriteMetrics private ( * many places only to merge their values together later. In the future, we should revisit * whether this is needed. * - * A better alternative is [[TaskMetrics.registerShuffleWriteMetrics]]. + * A better alternative is [[TaskMetrics.shuffleWriteMetrics]]. */ private[spark] def this() { this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap) @@ -68,6 +68,11 @@ class ShuffleWriteMetrics private ( */ def writeTime: Long = _writeTime.localValue + /** + * Returns true if this metrics has been updated before. + */ + def isUpdated: Boolean = (writeTime | recordsWritten | bytesWritten) != 0 + private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bda2a91d9d..0198364825 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -91,6 +91,14 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se private val _updatedBlockStatuses = TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES) + private val _inputMetrics = new InputMetrics(initialAccumsMap) + + private val _outputMetrics = new OutputMetrics(initialAccumsMap) + + private val _shuffleReadMetrics = new ShuffleReadMetrics(initialAccumsMap) + + private val _shuffleWriteMetrics = new ShuffleWriteMetrics(initialAccumsMap) + /** * Time taken on the executor to deserialize this task. */ @@ -163,83 +171,23 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se TaskMetrics.getAccum[Long](initialAccumsMap, name) } - - /* ========================== * - | INPUT METRICS | - * ========================== */ - - private var _inputMetrics: Option[InputMetrics] = None - /** * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted * data, defined only in tasks with input. */ - def inputMetrics: Option[InputMetrics] = _inputMetrics - - /** - * Get or create a new [[InputMetrics]] associated with this task. - */ - private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = { - synchronized { - val metrics = _inputMetrics.getOrElse { - val metrics = new InputMetrics(initialAccumsMap) - metrics.setReadMethod(readMethod) - _inputMetrics = Some(metrics) - metrics - } - // If there already exists an InputMetric with the same read method, we can just return - // that one. Otherwise, if the read method is different from the one previously seen by - // this task, we return a new dummy one to avoid clobbering the values of the old metrics. - // In the future we should try to store input metrics from all different read methods at - // the same time (SPARK-5225). - if (metrics.readMethod == readMethod) { - metrics - } else { - val m = new InputMetrics - m.setReadMethod(readMethod) - m - } - } - } - - - /* ============================ * - | OUTPUT METRICS | - * ============================ */ - - private var _outputMetrics: Option[OutputMetrics] = None + def inputMetrics: InputMetrics = _inputMetrics /** * Metrics related to writing data externally (e.g. to a distributed filesystem), * defined only in tasks with output. */ - def outputMetrics: Option[OutputMetrics] = _outputMetrics - - /** - * Get or create a new [[OutputMetrics]] associated with this task. - */ - private[spark] def registerOutputMetrics( - writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized { - _outputMetrics.getOrElse { - val metrics = new OutputMetrics(initialAccumsMap) - metrics.setWriteMethod(writeMethod) - _outputMetrics = Some(metrics) - metrics - } - } - - - /* ================================== * - | SHUFFLE READ METRICS | - * ================================== */ - - private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None + def outputMetrics: OutputMetrics = _outputMetrics /** * Metrics related to shuffle read aggregated across all shuffle dependencies. * This is defined only if there are shuffle dependencies in this task. */ - def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics + def shuffleReadMetrics: ShuffleReadMetrics = _shuffleReadMetrics /** * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency. @@ -257,7 +205,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se * merges the temporary values synchronously. Otherwise, all temporary data collected will * be lost. */ - private[spark] def registerTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized { + private[spark] def createTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized { val readMetrics = new ShuffleReadMetrics tempShuffleReadMetrics += readMetrics readMetrics @@ -269,34 +217,14 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se */ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { if (tempShuffleReadMetrics.nonEmpty) { - val metrics = new ShuffleReadMetrics(initialAccumsMap) - metrics.setMergeValues(tempShuffleReadMetrics) - _shuffleReadMetrics = Some(metrics) + _shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) } } - /* =================================== * - | SHUFFLE WRITE METRICS | - * =================================== */ - - private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None - /** * Metrics related to shuffle write, defined only in shuffle map stages. */ - def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics - - /** - * Get or create a new [[ShuffleWriteMetrics]] associated with this task. - */ - private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized { - _shuffleWriteMetrics.getOrElse { - val metrics = new ShuffleWriteMetrics(initialAccumsMap) - _shuffleWriteMetrics = Some(metrics) - metrics - } - } - + def shuffleWriteMetrics: ShuffleWriteMetrics = _shuffleWriteMetrics /* ========================== * | OTHER THINGS | @@ -316,28 +244,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se def accumulatorUpdates(): Seq[AccumulableInfo] = { accums.map { a => a.toInfo(Some(a.localValue), None) } } - - // If we are reconstructing this TaskMetrics on the driver, some metrics may already be set. - // If so, initialize all relevant metrics classes so listeners can access them downstream. - { - var (hasShuffleRead, hasShuffleWrite, hasInput, hasOutput) = (false, false, false, false) - initialAccums - .filter { a => a.localValue != a.zero } - .foreach { a => - a.name.get match { - case sr if sr.startsWith(SHUFFLE_READ_METRICS_PREFIX) => hasShuffleRead = true - case sw if sw.startsWith(SHUFFLE_WRITE_METRICS_PREFIX) => hasShuffleWrite = true - case in if in.startsWith(INPUT_METRICS_PREFIX) => hasInput = true - case out if out.startsWith(OUTPUT_METRICS_PREFIX) => hasOutput = true - case _ => - } - } - if (hasShuffleRead) { _shuffleReadMetrics = Some(new ShuffleReadMetrics(initialAccumsMap)) } - if (hasShuffleWrite) { _shuffleWriteMetrics = Some(new ShuffleWriteMetrics(initialAccumsMap)) } - if (hasInput) { _inputMetrics = Some(new InputMetrics(initialAccumsMap)) } - if (hasOutput) { _outputMetrics = Some(new OutputMetrics(initialAccumsMap)) } - } - } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 35d190b464..6b1e15572c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,7 +213,7 @@ class HadoopRDD[K, V]( logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() - val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics().inputMetrics val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 3ccd616cbf..a71c191b31 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -130,7 +130,7 @@ class NewHadoopRDD[K, V]( logInfo("Input split: " + split.serializableHadoopSplit) val conf = getConf - val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics().inputMetrics val existingBytesRead = inputMetrics.bytesRead // Find a function that will return the FileSystem bytes read by this thread. Do this before diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 085829af6e..7936d8e1d4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1218,7 +1218,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) context: TaskContext): Option[(OutputMetrics, () => Long)] = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() bytesWrittenCallback.map { b => - (context.taskMetrics().registerOutputMetrics(DataWriteMethod.Hadoop), b) + (context.taskMetrics().outputMetrics, b) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 36ff3bcaae..f6e0148f78 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -332,7 +332,7 @@ abstract class RDD[T: ClassTag]( }) match { case Left(blockResult) => if (readCachedBlock) { - val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod) + val existingMetrics = context.taskMetrics().inputMetrics existingMetrics.incBytesRead(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala index 309f4b806b..3c8cab7504 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala @@ -47,19 +47,19 @@ class StatsReportListener extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { implicit val sc = stageCompleted this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") - showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics) + showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics) // Shuffle write showBytesDistribution("shuffle bytes written:", - (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics) + (_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics) // Fetch & I/O showMillisDistribution("fetch wait time:", - (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics) + (_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics) showBytesDistribution("remote bytes read:", - (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics) + (_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics) showBytesDistribution("task result size:", - (_, metric) => Some(metric.resultSize), taskInfoMetrics) + (_, metric) => metric.resultSize, taskInfoMetrics) // Runtime breakdown val runtimePcts = taskInfoMetrics.map { case (info, metrics) => @@ -95,17 +95,17 @@ private[spark] object StatsReportListener extends Logging { def extractDoubleDistribution( taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)], - getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = { - Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) }) + getMetric: (TaskInfo, TaskMetrics) => Double): Option[Distribution] = { + Distribution(taskInfoMetrics.map { case (info, metric) => getMetric(info, metric) }) } // Is there some way to setup the types that I can get rid of this completely? def extractLongDistribution( taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)], - getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = { + getMetric: (TaskInfo, TaskMetrics) => Long): Option[Distribution] = { extractDoubleDistribution( taskInfoMetrics, - (info, metric) => { getMetric(info, metric).map(_.toDouble) }) + (info, metric) => { getMetric(info, metric).toDouble }) } def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) { @@ -117,9 +117,9 @@ private[spark] object StatsReportListener extends Logging { } def showDistribution( - heading: String, - dOpt: Option[Distribution], - formatNumber: Double => String) { + heading: String, + dOpt: Option[Distribution], + formatNumber: Double => String) { dOpt.foreach { d => showDistribution(heading, d, formatNumber)} } @@ -129,17 +129,17 @@ private[spark] object StatsReportListener extends Logging { } def showDistribution( - heading: String, - format: String, - getMetric: (TaskInfo, TaskMetrics) => Option[Double], - taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { + heading: String, + format: String, + getMetric: (TaskInfo, TaskMetrics) => Double, + taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format) } def showBytesDistribution( - heading: String, - getMetric: (TaskInfo, TaskMetrics) => Option[Long], - taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { + heading: String, + getMetric: (TaskInfo, TaskMetrics) => Long, + taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric)) } @@ -157,9 +157,9 @@ private[spark] object StatsReportListener extends Logging { } def showMillisDistribution( - heading: String, - getMetric: (TaskInfo, TaskMetrics) => Option[Long], - taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { + heading: String, + getMetric: (TaskInfo, TaskMetrics) => Long, + taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric)) } @@ -190,7 +190,7 @@ private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Doubl private object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { val denom = totalTime.toDouble - val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime) + val fetchTime = Some(metrics.shuffleReadMetrics.fetchWaitTime) val fetch = fetchTime.map(_ / denom) val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom val other = 1.0 - (exec + fetch.getOrElse(0d)) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 876cdfaa87..5794f542b7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -67,7 +67,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( } // Update the context task metrics for each record read. - val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics() + val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 9276d95012..6c4444ffb4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -41,7 +41,7 @@ private[spark] class HashShuffleWriter[K, V]( // we don't try deleting files, etc twice. private var stopping = false - private val writeMetrics = metrics.registerShuffleWriteMetrics() + private val writeMetrics = metrics.shuffleWriteMetrics private val blockManager = SparkEnv.get.blockManager private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 8ab1cee2e8..1adacabc86 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -45,7 +45,7 @@ private[spark] class SortShuffleWriter[K, V, C]( private var mapStatus: MapStatus = null - private val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics() + private val writeMetrics = context.taskMetrics().shuffleWriteMetrics /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index f8d6e9fbbb..85452d6497 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -170,7 +170,11 @@ private[v1] object AllStagesResource { val inputMetrics: Option[InputMetricDistributions] = new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) { def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = { - raw.inputMetrics + if (raw.inputMetrics.isUpdated) { + Some(raw.inputMetrics) + } else { + None + } } def build: InputMetricDistributions = new InputMetricDistributions( @@ -182,7 +186,11 @@ private[v1] object AllStagesResource { val outputMetrics: Option[OutputMetricDistributions] = new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) { def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = { - raw.outputMetrics + if (raw.outputMetrics.isUpdated) { + Some(raw.outputMetrics) + } else { + None + } } def build: OutputMetricDistributions = new OutputMetricDistributions( bytesWritten = submetricQuantiles(_.bytesWritten), @@ -194,7 +202,11 @@ private[v1] object AllStagesResource { new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics, quantiles) { def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = { - raw.shuffleReadMetrics + if (raw.shuffleReadMetrics.isUpdated) { + Some(raw.shuffleReadMetrics) + } else { + None + } } def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions( readBytes = submetricQuantiles(_.totalBytesRead), @@ -211,7 +223,11 @@ private[v1] object AllStagesResource { new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics, quantiles) { def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = { - raw.shuffleWriteMetrics + if (raw.shuffleWriteMetrics.isUpdated) { + Some(raw.shuffleWriteMetrics) + } else { + None + } } def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions( writeBytes = submetricQuantiles(_.bytesWritten), @@ -250,44 +266,62 @@ private[v1] object AllStagesResource { resultSerializationTime = internal.resultSerializationTime, memoryBytesSpilled = internal.memoryBytesSpilled, diskBytesSpilled = internal.diskBytesSpilled, - inputMetrics = internal.inputMetrics.map { convertInputMetrics }, - outputMetrics = Option(internal.outputMetrics).flatten.map { convertOutputMetrics }, - shuffleReadMetrics = internal.shuffleReadMetrics.map { convertShuffleReadMetrics }, - shuffleWriteMetrics = internal.shuffleWriteMetrics.map { convertShuffleWriteMetrics } + inputMetrics = convertInputMetrics(internal.inputMetrics), + outputMetrics = convertOutputMetrics(internal.outputMetrics), + shuffleReadMetrics = convertShuffleReadMetrics(internal.shuffleReadMetrics), + shuffleWriteMetrics = convertShuffleWriteMetrics(internal.shuffleWriteMetrics) ) } - def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = { - new InputMetrics( - bytesRead = internal.bytesRead, - recordsRead = internal.recordsRead - ) + def convertInputMetrics(internal: InternalInputMetrics): Option[InputMetrics] = { + if (internal.isUpdated) { + Some(new InputMetrics( + bytesRead = internal.bytesRead, + recordsRead = internal.recordsRead + )) + } else { + None + } } - def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = { - new OutputMetrics( - bytesWritten = internal.bytesWritten, - recordsWritten = internal.recordsWritten - ) + def convertOutputMetrics(internal: InternalOutputMetrics): Option[OutputMetrics] = { + if (internal.isUpdated) { + Some(new OutputMetrics( + bytesWritten = internal.bytesWritten, + recordsWritten = internal.recordsWritten + )) + } else { + None + } } - def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = { - new ShuffleReadMetrics( - remoteBlocksFetched = internal.remoteBlocksFetched, - localBlocksFetched = internal.localBlocksFetched, - fetchWaitTime = internal.fetchWaitTime, - remoteBytesRead = internal.remoteBytesRead, - totalBlocksFetched = internal.totalBlocksFetched, - recordsRead = internal.recordsRead - ) + def convertShuffleReadMetrics( + internal: InternalShuffleReadMetrics): Option[ShuffleReadMetrics] = { + if (internal.isUpdated) { + Some(new ShuffleReadMetrics( + remoteBlocksFetched = internal.remoteBlocksFetched, + localBlocksFetched = internal.localBlocksFetched, + fetchWaitTime = internal.fetchWaitTime, + remoteBytesRead = internal.remoteBytesRead, + totalBlocksFetched = internal.totalBlocksFetched, + recordsRead = internal.recordsRead + )) + } else { + None + } } - def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { - new ShuffleWriteMetrics( - bytesWritten = internal.bytesWritten, - writeTime = internal.writeTime, - recordsWritten = internal.recordsWritten - ) + def convertShuffleWriteMetrics( + internal: InternalShuffleWriteMetrics): Option[ShuffleWriteMetrics] = { + if ((internal.bytesWritten | internal.writeTime | internal.recordsWritten) == 0) { + None + } else { + Some(new ShuffleWriteMetrics( + bytesWritten = internal.bytesWritten, + writeTime = internal.writeTime, + recordsWritten = internal.recordsWritten + )) + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 4ec5b4bbb0..4dc2f36232 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -108,7 +108,7 @@ final class ShuffleBlockFetcherIterator( /** Current number of requests in flight */ private[this] var reqsInFlight = 0 - private[this] val shuffleMetrics = context.taskMetrics().registerTempShuffleReadMetrics() + private[this] val shuffleMetrics = context.taskMetrics().createTempShuffleReadMetrics() /** * Whether the iterator is still active. If isZombie is true, the callback interface will no diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 3fd0efd3a1..676f445751 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -119,26 +119,19 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar // Update shuffle read/write val metrics = taskEnd.taskMetrics if (metrics != null) { - metrics.inputMetrics.foreach { inputMetrics => - executorToInputBytes(eid) = - executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead - executorToInputRecords(eid) = - executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead - } - metrics.outputMetrics.foreach { outputMetrics => - executorToOutputBytes(eid) = - executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten - executorToOutputRecords(eid) = - executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten - } - metrics.shuffleReadMetrics.foreach { shuffleRead => - executorToShuffleRead(eid) = - executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead - } - metrics.shuffleWriteMetrics.foreach { shuffleWrite => - executorToShuffleWrite(eid) = - executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten - } + executorToInputBytes(eid) = + executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead + executorToInputRecords(eid) = + executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead + executorToOutputBytes(eid) = + executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten + executorToOutputRecords(eid) = + executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten + + executorToShuffleRead(eid) = + executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead + executorToShuffleWrite(eid) = + executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 13f5f84d06..9e4771ce4a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -434,50 +434,50 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary) val shuffleWriteDelta = - (taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L)) + taskMetrics.shuffleWriteMetrics.bytesWritten - + oldMetrics.map(_.shuffleWriteMetrics.bytesWritten).getOrElse(0L) stageData.shuffleWriteBytes += shuffleWriteDelta execSummary.shuffleWrite += shuffleWriteDelta val shuffleWriteRecordsDelta = - (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L)) + taskMetrics.shuffleWriteMetrics.recordsWritten - + oldMetrics.map(_.shuffleWriteMetrics.recordsWritten).getOrElse(0L) stageData.shuffleWriteRecords += shuffleWriteRecordsDelta execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta val shuffleReadDelta = - (taskMetrics.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.totalBytesRead).getOrElse(0L)) + taskMetrics.shuffleReadMetrics.totalBytesRead - + oldMetrics.map(_.shuffleReadMetrics.totalBytesRead).getOrElse(0L) stageData.shuffleReadTotalBytes += shuffleReadDelta execSummary.shuffleRead += shuffleReadDelta val shuffleReadRecordsDelta = - (taskMetrics.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.recordsRead).getOrElse(0L)) + taskMetrics.shuffleReadMetrics.recordsRead - + oldMetrics.map(_.shuffleReadMetrics.recordsRead).getOrElse(0L) stageData.shuffleReadRecords += shuffleReadRecordsDelta execSummary.shuffleReadRecords += shuffleReadRecordsDelta val inputBytesDelta = - (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L) - - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)) + taskMetrics.inputMetrics.bytesRead - + oldMetrics.map(_.inputMetrics.bytesRead).getOrElse(0L) stageData.inputBytes += inputBytesDelta execSummary.inputBytes += inputBytesDelta val inputRecordsDelta = - (taskMetrics.inputMetrics.map(_.recordsRead).getOrElse(0L) - - oldMetrics.flatMap(_.inputMetrics).map(_.recordsRead).getOrElse(0L)) + taskMetrics.inputMetrics.recordsRead - + oldMetrics.map(_.inputMetrics.recordsRead).getOrElse(0L) stageData.inputRecords += inputRecordsDelta execSummary.inputRecords += inputRecordsDelta val outputBytesDelta = - (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L) - - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L)) + taskMetrics.outputMetrics.bytesWritten - + oldMetrics.map(_.outputMetrics.bytesWritten).getOrElse(0L) stageData.outputBytes += outputBytesDelta execSummary.outputBytes += outputBytesDelta val outputRecordsDelta = - (taskMetrics.outputMetrics.map(_.recordsWritten).getOrElse(0L) - - oldMetrics.flatMap(_.outputMetrics).map(_.recordsWritten).getOrElse(0L)) + taskMetrics.outputMetrics.recordsWritten - + oldMetrics.map(_.outputMetrics.recordsWritten).getOrElse(0L) stageData.outputRecords += outputRecordsDelta execSummary.outputRecords += outputRecordsDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8a44bbd9fc..5d1928ac6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -428,29 +428,29 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } val inputSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble + taskUIData.metrics.get.inputMetrics.bytesRead.toDouble } val inputRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble + taskUIData.metrics.get.inputMetrics.recordsRead.toDouble } val inputQuantiles = <td>Input Size / Records</td> +: getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords) val outputSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble + taskUIData.metrics.get.outputMetrics.bytesWritten.toDouble } val outputRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + taskUIData.metrics.get.outputMetrics.recordsWritten.toDouble } val outputQuantiles = <td>Output Size / Records</td> +: getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords) val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble + taskUIData.metrics.get.shuffleReadMetrics.fetchWaitTime.toDouble } val shuffleReadBlockedQuantiles = <td> @@ -462,10 +462,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedTimeQuantiles(shuffleReadBlockedTimes) val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble + taskUIData.metrics.get.shuffleReadMetrics.totalBytesRead.toDouble } val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble + taskUIData.metrics.get.shuffleReadMetrics.recordsRead.toDouble } val shuffleReadTotalQuantiles = <td> @@ -477,7 +477,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords) val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + taskUIData.metrics.get.shuffleReadMetrics.remoteBytesRead.toDouble } val shuffleReadRemoteQuantiles = <td> @@ -489,11 +489,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedSizeQuantiles(shuffleReadRemoteSizes) val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble + taskUIData.metrics.get.shuffleWriteMetrics.bytesWritten.toDouble } val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData => - taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + taskUIData.metrics.get.shuffleWriteMetrics.recordsWritten.toDouble } val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +: @@ -603,11 +603,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val metricsOpt = taskUIData.metrics val shuffleReadTime = - metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L) + metricsOpt.map(_.shuffleReadMetrics.fetchWaitTime).getOrElse(0L) val shuffleReadTimeProportion = toProportion(shuffleReadTime) val shuffleWriteTime = - (metricsOpt.flatMap(_.shuffleWriteMetrics - .map(_.writeTime)).getOrElse(0L) / 1e6).toLong + (metricsOpt.map(_.shuffleWriteMetrics.writeTime).getOrElse(0L) / 1e6).toLong val shuffleWriteTimeProportion = toProportion(shuffleWriteTime) val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L) @@ -890,21 +889,21 @@ private[ui] class TaskDataSource( } val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L) - val maybeInput = metrics.flatMap(_.inputMetrics) + val maybeInput = metrics.map(_.inputMetrics) val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L) val inputReadable = maybeInput - .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") + .map(m => s"${Utils.bytesToString(m.bytesRead)}") .getOrElse("") val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("") - val maybeOutput = metrics.flatMap(_.outputMetrics) + val maybeOutput = metrics.map(_.outputMetrics) val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L) val outputReadable = maybeOutput .map(m => s"${Utils.bytesToString(m.bytesWritten)}") .getOrElse("") val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics) + val maybeShuffleRead = metrics.map(_.shuffleReadMetrics) val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L) val shuffleReadBlockedTimeReadable = maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") @@ -918,14 +917,14 @@ private[ui] class TaskDataSource( val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L) val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("") - val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics) + val maybeShuffleWrite = metrics.map(_.shuffleWriteMetrics) val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L) val shuffleWriteReadable = maybeShuffleWrite .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("") val shuffleWriteRecords = maybeShuffleWrite .map(_.recordsWritten.toString).getOrElse("") - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime) + val maybeWriteTime = metrics.map(_.shuffleWriteMetrics.writeTime) val writeTimeSortable = maybeWriteTime.getOrElse(0L) val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => if (ms == 0) "" else UIUtils.formatDuration(ms) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 558767e36f..17b33c7c62 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -326,33 +326,35 @@ private[spark] object JsonProtocol { } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { - val shuffleReadMetrics: JValue = - taskMetrics.shuffleReadMetrics.map { rm => - ("Remote Blocks Fetched" -> rm.remoteBlocksFetched) ~ - ("Local Blocks Fetched" -> rm.localBlocksFetched) ~ - ("Fetch Wait Time" -> rm.fetchWaitTime) ~ - ("Remote Bytes Read" -> rm.remoteBytesRead) ~ - ("Local Bytes Read" -> rm.localBytesRead) ~ - ("Total Records Read" -> rm.recordsRead) - }.getOrElse(JNothing) - val shuffleWriteMetrics: JValue = - taskMetrics.shuffleWriteMetrics.map { wm => - ("Shuffle Bytes Written" -> wm.bytesWritten) ~ - ("Shuffle Write Time" -> wm.writeTime) ~ - ("Shuffle Records Written" -> wm.recordsWritten) - }.getOrElse(JNothing) - val inputMetrics: JValue = - taskMetrics.inputMetrics.map { im => - ("Data Read Method" -> im.readMethod.toString) ~ - ("Bytes Read" -> im.bytesRead) ~ - ("Records Read" -> im.recordsRead) - }.getOrElse(JNothing) - val outputMetrics: JValue = - taskMetrics.outputMetrics.map { om => - ("Data Write Method" -> om.writeMethod.toString) ~ - ("Bytes Written" -> om.bytesWritten) ~ - ("Records Written" -> om.recordsWritten) - }.getOrElse(JNothing) + val shuffleReadMetrics: JValue = if (taskMetrics.shuffleReadMetrics.isUpdated) { + ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ + ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~ + ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~ + ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~ + ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~ + ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead) + } else { + JNothing + } + val shuffleWriteMetrics: JValue = if (taskMetrics.shuffleWriteMetrics.isUpdated) { + ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~ + ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~ + ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten) + } else { + JNothing + } + val inputMetrics: JValue = if (taskMetrics.inputMetrics.isUpdated) { + ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~ + ("Records Read" -> taskMetrics.inputMetrics.recordsRead) + } else { + JNothing + } + val outputMetrics: JValue = if (taskMetrics.outputMetrics.isUpdated) { + ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~ + ("Records Written" -> taskMetrics.outputMetrics.recordsWritten) + } else { + JNothing + } val updatedBlocks = JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => ("Block ID" -> id.toString) ~ @@ -781,7 +783,7 @@ private[spark] object JsonProtocol { // Shuffle read metrics Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson => - val readMetrics = metrics.registerTempShuffleReadMetrics() + val readMetrics = metrics.createTempShuffleReadMetrics() readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int]) readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int]) readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long]) @@ -794,7 +796,7 @@ private[spark] object JsonProtocol { // Shuffle write metrics // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes. Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson => - val writeMetrics = metrics.registerShuffleWriteMetrics() + val writeMetrics = metrics.shuffleWriteMetrics writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long]) writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written") .extractOpt[Long].getOrElse(0L)) @@ -803,16 +805,14 @@ private[spark] object JsonProtocol { // Output metrics Utils.jsonOption(json \ "Output Metrics").foreach { outJson => - val writeMethod = DataWriteMethod.withName((outJson \ "Data Write Method").extract[String]) - val outputMetrics = metrics.registerOutputMetrics(writeMethod) + val outputMetrics = metrics.outputMetrics outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long]) outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L)) } // Input metrics Utils.jsonOption(json \ "Input Metrics").foreach { inJson => - val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String]) - val inputMetrics = metrics.registerInputMetrics(readMethod) + val inputMetrics = metrics.inputMetrics inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long]) inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L)) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 561ba22df5..916053f42d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -645,7 +645,7 @@ private[spark] class ExternalSorter[K, V, C]( blockId: BlockId, outputFile: File): Array[Long] = { - val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics() + val writeMetrics = context.taskMetrics().shuffleWriteMetrics // Track location of each range in the output file val lengths = new Array[Long](numPartitions) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 30750b1bf1..fbaaa1cf49 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -249,8 +249,8 @@ public class UnsafeShuffleWriterSuite { assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile); - assertEquals(0, taskMetrics.shuffleWriteMetrics().get().recordsWritten()); - assertEquals(0, taskMetrics.shuffleWriteMetrics().get().bytesWritten()); + assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten()); + assertEquals(0, taskMetrics.shuffleWriteMetrics().bytesWritten()); assertEquals(0, taskMetrics.diskBytesSpilled()); assertEquals(0, taskMetrics.memoryBytesSpilled()); } @@ -279,7 +279,7 @@ public class UnsafeShuffleWriterSuite { HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile())); assertSpillFilesWereCleanedUp(); - ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); + ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics(); assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertEquals(0, taskMetrics.diskBytesSpilled()); assertEquals(0, taskMetrics.memoryBytesSpilled()); @@ -321,7 +321,7 @@ public class UnsafeShuffleWriterSuite { assertEquals(HashMultiset.create(dataToWrite), HashMultiset.create(readRecordsFromFile())); assertSpillFilesWereCleanedUp(); - ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); + ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics(); assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L)); assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length())); @@ -383,7 +383,7 @@ public class UnsafeShuffleWriterSuite { writer.stop(true); readRecordsFromFile(); assertSpillFilesWereCleanedUp(); - ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); + ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics(); assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L)); assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length())); @@ -404,7 +404,7 @@ public class UnsafeShuffleWriterSuite { writer.stop(true); readRecordsFromFile(); assertSpillFilesWereCleanedUp(); - ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics().get(); + ShuffleWriteMetrics shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics(); assertEquals(dataToWrite.size(), shuffleWriteMetrics.recordsWritten()); assertThat(taskMetrics.diskBytesSpilled(), greaterThan(0L)); assertThat(taskMetrics.diskBytesSpilled(), lessThan(mergedOutputFile.length())); diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 474550608b..db087a9c3c 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -59,11 +59,9 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { assert(getParam(shuffleWrite.RECORDS_WRITTEN) === LongAccumulatorParam) assert(getParam(shuffleWrite.WRITE_TIME) === LongAccumulatorParam) // input - assert(getParam(input.READ_METHOD) === StringAccumulatorParam) assert(getParam(input.RECORDS_READ) === LongAccumulatorParam) assert(getParam(input.BYTES_READ) === LongAccumulatorParam) // output - assert(getParam(output.WRITE_METHOD) === StringAccumulatorParam) assert(getParam(output.RECORDS_WRITTEN) === LongAccumulatorParam) assert(getParam(output.BYTES_WRITTEN) === LongAccumulatorParam) // default to Long @@ -77,18 +75,15 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { val executorRunTime = create(EXECUTOR_RUN_TIME) val updatedBlockStatuses = create(UPDATED_BLOCK_STATUSES) val shuffleRemoteBlocksRead = create(shuffleRead.REMOTE_BLOCKS_FETCHED) - val inputReadMethod = create(input.READ_METHOD) assert(executorRunTime.name === Some(EXECUTOR_RUN_TIME)) assert(updatedBlockStatuses.name === Some(UPDATED_BLOCK_STATUSES)) assert(shuffleRemoteBlocksRead.name === Some(shuffleRead.REMOTE_BLOCKS_FETCHED)) - assert(inputReadMethod.name === Some(input.READ_METHOD)) assert(executorRunTime.value.isInstanceOf[Long]) assert(updatedBlockStatuses.value.isInstanceOf[Seq[_]]) // We cannot assert the type of the value directly since the type parameter is erased. // Instead, try casting a `Seq` of expected type and see if it fails in run time. updatedBlockStatuses.setValueAny(Seq.empty[(BlockId, BlockStatus)]) assert(shuffleRemoteBlocksRead.value.isInstanceOf[Int]) - assert(inputReadMethod.value.isInstanceOf[String]) // default to Long val anything = create(METRICS_PREFIX + "anything") assert(anything.value.isInstanceOf[Long]) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index cd7d2e1570..079109d137 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -450,14 +450,10 @@ object ShuffleSuite { @volatile var bytesRead: Long = 0 val listener = new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskEnd.taskMetrics.shuffleWriteMetrics.foreach { m => - recordsWritten += m.recordsWritten - bytesWritten += m.bytesWritten - } - taskEnd.taskMetrics.shuffleReadMetrics.foreach { m => - recordsRead += m.recordsRead - bytesRead += m.totalBytesRead - } + recordsWritten += taskEnd.taskMetrics.shuffleWriteMetrics.recordsWritten + bytesWritten += taskEnd.taskMetrics.shuffleWriteMetrics.bytesWritten + recordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead + bytesRead += taskEnd.taskMetrics.shuffleReadMetrics.totalBytesRead } } sc.addSparkListener(listener) diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index d91f50f18f..a263fce8ab 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -30,24 +30,6 @@ class TaskMetricsSuite extends SparkFunSuite { import StorageLevel._ import TaskMetricsSuite._ - test("create") { - val internalAccums = InternalAccumulator.createAll() - val tm1 = new TaskMetrics - val tm2 = new TaskMetrics(internalAccums) - assert(tm1.accumulatorUpdates().size === internalAccums.size) - assert(tm1.shuffleReadMetrics.isEmpty) - assert(tm1.shuffleWriteMetrics.isEmpty) - assert(tm1.inputMetrics.isEmpty) - assert(tm1.outputMetrics.isEmpty) - assert(tm2.accumulatorUpdates().size === internalAccums.size) - assert(tm2.shuffleReadMetrics.isEmpty) - assert(tm2.shuffleWriteMetrics.isEmpty) - assert(tm2.inputMetrics.isEmpty) - assert(tm2.outputMetrics.isEmpty) - // TaskMetrics constructor expects minimal set of initial accumulators - intercept[IllegalArgumentException] { new TaskMetrics(Seq.empty[Accumulator[_]]) } - } - test("create with unnamed accum") { intercept[IllegalArgumentException] { new TaskMetrics( @@ -110,11 +92,9 @@ class TaskMetricsSuite extends SparkFunSuite { .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]] accums(BYTES_READ).setValueAny(1L) accums(RECORDS_READ).setValueAny(2L) - accums(READ_METHOD).setValueAny(DataReadMethod.Hadoop.toString) val im = new InputMetrics(accums) assert(im.bytesRead === 1L) assert(im.recordsRead === 2L) - assert(im.readMethod === DataReadMethod.Hadoop) } test("create output metrics") { @@ -123,11 +103,9 @@ class TaskMetricsSuite extends SparkFunSuite { .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]] accums(BYTES_WRITTEN).setValueAny(1L) accums(RECORDS_WRITTEN).setValueAny(2L) - accums(WRITE_METHOD).setValueAny(DataWriteMethod.Hadoop.toString) val om = new OutputMetrics(accums) assert(om.bytesWritten === 1L) assert(om.recordsWritten === 2L) - assert(om.writeMethod === DataWriteMethod.Hadoop) } test("mutating values") { @@ -183,14 +161,12 @@ class TaskMetricsSuite extends SparkFunSuite { val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = { - assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value) + assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics), accums, name, value) } // create shuffle read metrics - assert(tm.shuffleReadMetrics.isEmpty) - tm.registerTempShuffleReadMetrics() + tm.createTempShuffleReadMetrics() tm.mergeShuffleReadMetrics() - assert(tm.shuffleReadMetrics.isDefined) - val sr = tm.shuffleReadMetrics.get + val sr = tm.shuffleReadMetrics // initial values assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0) assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0) @@ -237,13 +213,10 @@ class TaskMetricsSuite extends SparkFunSuite { val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = { - assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value) + assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics), accums, name, value) } // create shuffle write metrics - assert(tm.shuffleWriteMetrics.isEmpty) - tm.registerShuffleWriteMetrics() - assert(tm.shuffleWriteMetrics.isDefined) - val sw = tm.shuffleWriteMetrics.get + val sw = tm.shuffleWriteMetrics // initial values assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L) assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L) @@ -270,28 +243,22 @@ class TaskMetricsSuite extends SparkFunSuite { val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = { - assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value, + assertValueEquals(tm, tm => tmValue(tm.inputMetrics), accums, name, value, (x: Any, y: Any) => assert(x.toString === y.toString)) } // create input metrics - assert(tm.inputMetrics.isEmpty) - tm.registerInputMetrics(DataReadMethod.Memory) - assert(tm.inputMetrics.isDefined) - val in = tm.inputMetrics.get + val in = tm.inputMetrics // initial values assertValEquals(_.bytesRead, BYTES_READ, 0L) assertValEquals(_.recordsRead, RECORDS_READ, 0L) - assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory) // set and increment values in.setBytesRead(1L) in.setBytesRead(2L) in.incRecordsRead(1L) in.incRecordsRead(2L) - in.setReadMethod(DataReadMethod.Disk) // assert new values exist assertValEquals(_.bytesRead, BYTES_READ, 2L) assertValEquals(_.recordsRead, RECORDS_READ, 3L) - assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk) } test("mutating output metrics values") { @@ -299,85 +266,42 @@ class TaskMetricsSuite extends SparkFunSuite { val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = { - assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value, + assertValueEquals(tm, tm => tmValue(tm.outputMetrics), accums, name, value, (x: Any, y: Any) => assert(x.toString === y.toString)) } // create input metrics - assert(tm.outputMetrics.isEmpty) - tm.registerOutputMetrics(DataWriteMethod.Hadoop) - assert(tm.outputMetrics.isDefined) - val out = tm.outputMetrics.get + val out = tm.outputMetrics // initial values assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L) assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L) - assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop) // set values out.setBytesWritten(1L) out.setBytesWritten(2L) out.setRecordsWritten(3L) out.setRecordsWritten(4L) - out.setWriteMethod(DataWriteMethod.Hadoop) // assert new values exist assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L) assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L) - // Note: this doesn't actually test anything, but there's only one DataWriteMethod - // so we can't set it to anything else - assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop) } test("merging multiple shuffle read metrics") { val tm = new TaskMetrics - assert(tm.shuffleReadMetrics.isEmpty) - val sr1 = tm.registerTempShuffleReadMetrics() - val sr2 = tm.registerTempShuffleReadMetrics() - val sr3 = tm.registerTempShuffleReadMetrics() - assert(tm.shuffleReadMetrics.isEmpty) + val sr1 = tm.createTempShuffleReadMetrics() + val sr2 = tm.createTempShuffleReadMetrics() + val sr3 = tm.createTempShuffleReadMetrics() sr1.setRecordsRead(10L) sr2.setRecordsRead(10L) sr1.setFetchWaitTime(1L) sr2.setFetchWaitTime(2L) sr3.setFetchWaitTime(3L) tm.mergeShuffleReadMetrics() - assert(tm.shuffleReadMetrics.isDefined) - val sr = tm.shuffleReadMetrics.get - assert(sr.remoteBlocksFetched === 0L) - assert(sr.recordsRead === 20L) - assert(sr.fetchWaitTime === 6L) + assert(tm.shuffleReadMetrics.remoteBlocksFetched === 0L) + assert(tm.shuffleReadMetrics.recordsRead === 20L) + assert(tm.shuffleReadMetrics.fetchWaitTime === 6L) // SPARK-5701: calling merge without any shuffle deps does nothing val tm2 = new TaskMetrics tm2.mergeShuffleReadMetrics() - assert(tm2.shuffleReadMetrics.isEmpty) - } - - test("register multiple shuffle write metrics") { - val tm = new TaskMetrics - val sw1 = tm.registerShuffleWriteMetrics() - val sw2 = tm.registerShuffleWriteMetrics() - assert(sw1 === sw2) - assert(tm.shuffleWriteMetrics === Some(sw1)) - } - - test("register multiple input metrics") { - val tm = new TaskMetrics - val im1 = tm.registerInputMetrics(DataReadMethod.Memory) - val im2 = tm.registerInputMetrics(DataReadMethod.Memory) - // input metrics with a different read method than the one already registered are ignored - val im3 = tm.registerInputMetrics(DataReadMethod.Hadoop) - assert(im1 === im2) - assert(im1 !== im3) - assert(tm.inputMetrics === Some(im1)) - im2.setBytesRead(50L) - im3.setBytesRead(100L) - assert(tm.inputMetrics.get.bytesRead === 50L) - } - - test("register multiple output metrics") { - val tm = new TaskMetrics - val om1 = tm.registerOutputMetrics(DataWriteMethod.Hadoop) - val om2 = tm.registerOutputMetrics(DataWriteMethod.Hadoop) - assert(om1 === om2) - assert(tm.outputMetrics === Some(om1)) } test("additional accumulables") { @@ -424,10 +348,6 @@ class TaskMetricsSuite extends SparkFunSuite { assert(srAccum.isDefined) srAccum.get.asInstanceOf[Accumulator[Long]] += 10L val tm = new TaskMetrics(accums) - assert(tm.shuffleReadMetrics.isDefined) - assert(tm.shuffleWriteMetrics.isEmpty) - assert(tm.inputMetrics.isEmpty) - assert(tm.outputMetrics.isEmpty) } test("existing values in shuffle write accums") { @@ -437,10 +357,6 @@ class TaskMetricsSuite extends SparkFunSuite { assert(swAccum.isDefined) swAccum.get.asInstanceOf[Accumulator[Long]] += 10L val tm = new TaskMetrics(accums) - assert(tm.shuffleReadMetrics.isEmpty) - assert(tm.shuffleWriteMetrics.isDefined) - assert(tm.inputMetrics.isEmpty) - assert(tm.outputMetrics.isEmpty) } test("existing values in input accums") { @@ -450,10 +366,6 @@ class TaskMetricsSuite extends SparkFunSuite { assert(inAccum.isDefined) inAccum.get.asInstanceOf[Accumulator[Long]] += 10L val tm = new TaskMetrics(accums) - assert(tm.shuffleReadMetrics.isEmpty) - assert(tm.shuffleWriteMetrics.isEmpty) - assert(tm.inputMetrics.isDefined) - assert(tm.outputMetrics.isEmpty) } test("existing values in output accums") { @@ -463,10 +375,6 @@ class TaskMetricsSuite extends SparkFunSuite { assert(outAccum.isDefined) outAccum.get.asInstanceOf[Accumulator[Long]] += 10L val tm4 = new TaskMetrics(accums) - assert(tm4.shuffleReadMetrics.isEmpty) - assert(tm4.shuffleWriteMetrics.isEmpty) - assert(tm4.inputMetrics.isEmpty) - assert(tm4.outputMetrics.isDefined) } test("from accumulator updates") { diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 056e5463a0..f8054f5fd7 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -25,16 +25,10 @@ import org.apache.commons.lang3.RandomUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} -import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, - JobConf, LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, - Reporter, TextInputFormat => OldTextInputFormat} -import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat, - CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit} -import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader, - TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat, - CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit, - FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} +import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf, LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter, TextInputFormat => OldTextInputFormat} +import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat, CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit} +import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat, CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit, FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} import org.scalatest.BeforeAndAfter @@ -103,40 +97,6 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext assert(bytesRead2 == bytesRead) } - /** - * This checks the situation where we have interleaved reads from - * different sources. Currently, we only accumulate from the first - * read method we find in the task. This test uses cartesian to create - * the interleaved reads. - * - * Once https://issues.apache.org/jira/browse/SPARK-5225 is fixed - * this test should break. - */ - test("input metrics with mixed read method") { - // prime the cache manager - val numPartitions = 2 - val rdd = sc.parallelize(1 to 100, numPartitions).cache() - rdd.collect() - - val rdd2 = sc.textFile(tmpFilePath, numPartitions) - - val bytesRead = runAndReturnBytesRead { - rdd.count() - } - val bytesRead2 = runAndReturnBytesRead { - rdd2.count() - } - - val cartRead = runAndReturnBytesRead { - rdd.cartesian(rdd2).count() - } - - assert(cartRead != 0) - assert(bytesRead != 0) - // We read from the first rdd of the cartesian once per partition. - assert(cartRead == bytesRead * numPartitions) - } - test("input metrics for new Hadoop API with coalesce") { val bytesRead = runAndReturnBytesRead { sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], @@ -209,10 +169,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val metrics = taskEnd.taskMetrics - metrics.inputMetrics.foreach(inputRead += _.recordsRead) - metrics.outputMetrics.foreach(outputWritten += _.recordsWritten) - metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead) - metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten) + inputRead += metrics.inputMetrics.recordsRead + outputWritten += metrics.outputMetrics.recordsWritten + shuffleRead += metrics.shuffleReadMetrics.recordsRead + shuffleWritten += metrics.shuffleWriteMetrics.recordsWritten } }) @@ -272,19 +232,18 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext } private def runAndReturnBytesRead(job: => Unit): Long = { - runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead)) + runAndReturnMetrics(job, _.taskMetrics.inputMetrics.bytesRead) } private def runAndReturnRecordsRead(job: => Unit): Long = { - runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead)) + runAndReturnMetrics(job, _.taskMetrics.inputMetrics.recordsRead) } private def runAndReturnRecordsWritten(job: => Unit): Long = { - runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten)) + runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten) } - private def runAndReturnMetrics(job: => Unit, - collector: (SparkListenerTaskEnd) => Option[Long]): Long = { + private def runAndReturnMetrics(job: => Unit, collector: (SparkListenerTaskEnd) => Long): Long = { val taskMetrics = new ArrayBuffer[Long]() // Avoid receiving earlier taskEnd events @@ -292,7 +251,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - collector(taskEnd).foreach(taskMetrics += _) + taskMetrics += collector(taskEnd) } }) @@ -337,7 +296,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext val taskBytesWritten = new ArrayBuffer[Long]() sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskBytesWritten += taskEnd.taskMetrics.outputMetrics.get.bytesWritten + taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten } }) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index b854d742b5..5ba67afc0c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -266,18 +266,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0L) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { - taskMetrics.inputMetrics should not be ('defined) - taskMetrics.outputMetrics should not be ('defined) - taskMetrics.shuffleWriteMetrics should be ('defined) - taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L) + assert(taskMetrics.shuffleWriteMetrics.bytesWritten > 0L) } if (stageInfo.rddInfos.exists(_.name == d4.name)) { - taskMetrics.shuffleReadMetrics should be ('defined) - val sm = taskMetrics.shuffleReadMetrics.get - sm.totalBlocksFetched should be (2*numSlices) - sm.localBlocksFetched should be (2*numSlices) - sm.remoteBlocksFetched should be (0) - sm.remoteBytesRead should be (0L) + assert(taskMetrics.shuffleReadMetrics.totalBlocksFetched == 2 * numSlices) + assert(taskMetrics.shuffleReadMetrics.localBlocksFetched == 2 * numSlices) + assert(taskMetrics.shuffleReadMetrics.remoteBlocksFetched == 0) + assert(taskMetrics.shuffleReadMetrics.remoteBytesRead == 0L) } } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 16418f855b..5132384a5e 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -144,7 +144,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(outputFile.exists()) assert(outputFile.length() === 0) assert(temporaryFilesCreated.isEmpty) - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics assert(shuffleWriteMetrics.bytesWritten === 0) assert(shuffleWriteMetrics.recordsWritten === 0) assert(taskMetrics.diskBytesSpilled === 0) @@ -168,7 +168,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(writer.getPartitionLengths.sum === outputFile.length()) assert(writer.getPartitionLengths.count(_ == 0L) === 4) // should be 4 zero length files assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) assert(shuffleWriteMetrics.recordsWritten === records.length) assert(taskMetrics.diskBytesSpilled === 0) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 7d4c0863bc..85c877e3dd 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -184,7 +184,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val conf = new SparkConf() val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() - val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() + val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead @@ -272,10 +272,10 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val accums = InternalAccumulator.createAll() accums.foreach(Accumulators.register) val taskMetrics = new TaskMetrics(accums) - val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() - val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics() - val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop) - val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop) + val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() + val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics + val inputMetrics = taskMetrics.inputMetrics + val outputMetrics = taskMetrics.outputMetrics shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incLocalBytesRead(base + 9) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) @@ -322,12 +322,13 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with assert(stage1Data.inputBytes == 207) assert(stage0Data.outputBytes == 116) assert(stage1Data.outputBytes == 208) - assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 2) - assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 102) - assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 202) + + assert( + stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 2) + assert( + stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 102) + assert( + stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 202) // task that was included in a heartbeat listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), @@ -355,9 +356,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with assert(stage1Data.inputBytes == 614) assert(stage0Data.outputBytes == 416) assert(stage1Data.outputBytes == 616) - assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 302) - assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 402) + assert( + stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 302) + assert( + stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402) } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index de6f408fa8..612c7c1954 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -197,49 +197,41 @@ class JsonProtocolSuite extends SparkFunSuite { test("InputMetrics backward compatibility") { // InputMetrics were added after 1.0.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false) - assert(metrics.inputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.inputMetrics.isEmpty) } test("Input/Output records backwards compatibility") { // records read were added after 1.2 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true, hasRecords = false) - assert(metrics.inputMetrics.nonEmpty) - assert(metrics.outputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } .removeField { case (field, _) => field == "Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.inputMetrics.get.recordsRead == 0) - assert(newMetrics.outputMetrics.get.recordsWritten == 0) + assert(newMetrics.inputMetrics.recordsRead == 0) + assert(newMetrics.outputMetrics.recordsWritten == 0) } test("Shuffle Read/Write records backwards compatibility") { // records read were added after 1.2 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = false, hasRecords = false) - assert(metrics.shuffleReadMetrics.nonEmpty) - assert(metrics.shuffleWriteMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" } .removeField { case (field, _) => field == "Shuffle Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) - assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0) + assert(newMetrics.shuffleReadMetrics.recordsRead == 0) + assert(newMetrics.shuffleWriteMetrics.recordsWritten == 0) } test("OutputMetrics backward compatibility") { // OutputMetrics were added after 1.1 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true) - assert(metrics.outputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.outputMetrics.isEmpty) } test("BlockManager events backward compatibility") { @@ -279,11 +271,10 @@ class JsonProtocolSuite extends SparkFunSuite { // Metrics about local shuffle bytes read were added in 1.3.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = false, hasRecords = false) - assert(metrics.shuffleReadMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0) + assert(newMetrics.shuffleReadMetrics.localBytesRead == 0) } test("SparkListenerApplicationStart backwards compatibility") { @@ -423,7 +414,6 @@ class JsonProtocolSuite extends SparkFunSuite { }) testAccumValue(Some(RESULT_SIZE), 3L, JInt(3)) testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2)) - testAccumValue(Some(input.READ_METHOD), "aka", JString("aka")) testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson) // For anything else, we just cast the value to a string testAccumValue(Some("anything"), blocks, JString(blocks.toString)) @@ -619,12 +609,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(metrics1.resultSerializationTime === metrics2.resultSerializationTime) assert(metrics1.memoryBytesSpilled === metrics2.memoryBytesSpilled) assert(metrics1.diskBytesSpilled === metrics2.diskBytesSpilled) - assertOptionEquals( - metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals) - assertOptionEquals( - metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) - assertOptionEquals( - metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) + assertEquals(metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics) + assertEquals(metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics) + assertEquals(metrics1.inputMetrics, metrics2.inputMetrics) assertBlocksEquals(metrics1.updatedBlockStatuses, metrics2.updatedBlockStatuses) } @@ -641,7 +628,6 @@ private[spark] object JsonProtocolSuite extends Assertions { } private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { - assert(metrics1.readMethod === metrics2.readMethod) assert(metrics1.bytesRead === metrics2.bytesRead) } @@ -706,12 +692,13 @@ private[spark] object JsonProtocolSuite extends Assertions { } private def assertJsonStringEquals(expected: String, actual: String, metadata: String) { - val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "") - if (formatJsonString(expected) != formatJsonString(actual)) { + val expectedJson = pretty(parse(expected)) + val actualJson = pretty(parse(actual)) + if (expectedJson != actualJson) { // scalastyle:off // This prints something useful if the JSON strings don't match - println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n") - println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n") + println("=== EXPECTED ===\n" + expectedJson + "\n") + println("=== ACTUAL ===\n" + actualJson + "\n") // scalastyle:on throw new TestFailedException(s"$metadata JSON did not equal", 1) } @@ -740,22 +727,6 @@ private[spark] object JsonProtocolSuite extends Assertions { * Use different names for methods we pass in to assertSeqEquals or assertOptionEquals */ - private def assertShuffleReadEquals(r1: ShuffleReadMetrics, r2: ShuffleReadMetrics) { - assertEquals(r1, r2) - } - - private def assertShuffleWriteEquals(w1: ShuffleWriteMetrics, w2: ShuffleWriteMetrics) { - assertEquals(w1, w2) - } - - private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) { - assertEquals(i1, i2) - } - - private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) { - assertEquals(t1, t2) - } - private def assertBlocksEquals( blocks1: Seq[(BlockId, BlockStatus)], blocks2: Seq[(BlockId, BlockStatus)]) = { @@ -851,11 +822,11 @@ private[spark] object JsonProtocolSuite extends Assertions { t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { - val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop) + val inputMetrics = t.inputMetrics inputMetrics.setBytesRead(d + e + f) inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) } else { - val sr = t.registerTempShuffleReadMetrics() + val sr = t.createTempShuffleReadMetrics() sr.incRemoteBytesRead(b + d) sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) @@ -865,11 +836,10 @@ private[spark] object JsonProtocolSuite extends Assertions { t.mergeShuffleReadMetrics() } if (hasOutput) { - val outputMetrics = t.registerOutputMetrics(DataWriteMethod.Hadoop) - outputMetrics.setBytesWritten(a + b + c) - outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1) + t.outputMetrics.setBytesWritten(a + b + c) + t.outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) } else { - val sw = t.registerShuffleWriteMetrics() + val sw = t.shuffleWriteMetrics sw.incBytesWritten(a + b + c) sw.incWriteTime(b + c + d) sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) @@ -896,7 +866,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Stage Name": "greetings", | "Number of Tasks": 200, | "RDD Info": [], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -924,7 +894,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Ukraine": "Kiev" | } |} - """ + """.stripMargin private val stageCompletedJsonString = """ @@ -953,7 +923,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 501 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -975,7 +945,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | ] | } |} - """ + """.stripMargin private val taskStartJsonString = """ @@ -1223,7 +1193,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Shuffle Records Written": 12 | }, | "Input Metrics": { - | "Data Read Method": "Hadoop", | "Bytes Read": 2100, | "Records Read": 21 | }, @@ -1244,7 +1213,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | ] | } |} - """ + """.stripMargin private val taskEndWithOutputJsonString = """ @@ -1304,12 +1273,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Memory Bytes Spilled": 800, | "Disk Bytes Spilled": 0, | "Input Metrics": { - | "Data Read Method": "Hadoop", | "Bytes Read": 2100, | "Records Read": 21 | }, | "Output Metrics": { - | "Data Write Method": "Hadoop", | "Bytes Written": 1200, | "Records Written": 12 | }, @@ -1330,7 +1297,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | ] | } |} - """ + """.stripMargin private val jobStartJsonString = """ @@ -1422,7 +1389,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 1001 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1498,7 +1465,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 1502 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1590,7 +1557,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 2003 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1625,7 +1592,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Ukraine": "Kiev" | } |} - """ + """.stripMargin private val jobEndJsonString = """ @@ -1637,7 +1604,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Result": "JobSucceeded" | } |} - """ + """.stripMargin private val environmentUpdateJsonString = """ @@ -1658,7 +1625,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Super library": "/tmp/super_library" | } |} - """ + """.stripMargin private val blockManagerAddedJsonString = """ @@ -1672,7 +1639,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Maximum Memory": 500, | "Timestamp": 1 |} - """ + """.stripMargin private val blockManagerRemovedJsonString = """ @@ -1685,7 +1652,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | "Timestamp": 2 |} - """ + """.stripMargin private val unpersistRDDJsonString = """ @@ -1693,7 +1660,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Event": "SparkListenerUnpersistRDD", | "RDD ID": 12345 |} - """ + """.stripMargin private val applicationStartJsonString = """ @@ -1705,7 +1672,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "User": "Garfield", | "App Attempt ID": "appAttempt" |} - """ + """.stripMargin private val applicationStartJsonWithLogUrlsString = """ @@ -1721,7 +1688,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "stdout" : "mystdout" | } |} - """ + """.stripMargin private val applicationEndJsonString = """ @@ -1729,7 +1696,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Event": "SparkListenerApplicationEnd", | "Timestamp": 42 |} - """ + """.stripMargin private val executorAddedJsonString = s""" @@ -1746,7 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | } | } |} - """ + """.stripMargin private val executorRemovedJsonString = s""" @@ -1756,7 +1723,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor ID": "exec2", | "Removed Reason": "test reason" |} - """ + """.stripMargin private val executorMetricsUpdateJsonString = s""" @@ -1830,16 +1797,16 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "$UPDATED_BLOCK_STATUSES", | "Update": [ | { - | "BlockID": "rdd_0_0", + | "Block ID": "rdd_0_0", | "Status": { - | "StorageLevel": { - | "UseDisk": true, - | "UseMemory": true, + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, | "Deserialized": false, | "Replication": 2 | }, - | "MemorySize": 0, - | "DiskSize": 0 + | "Memory Size": 0, + | "Disk Size": 0 | } | } | ], @@ -1911,48 +1878,34 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | { | "ID": 18, - | "Name": "${input.READ_METHOD}", - | "Update": "Hadoop", - | "Internal": true, - | "Count Failed Values": true - | }, - | { - | "ID": 19, | "Name": "${input.BYTES_READ}", | "Update": 2100, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 20, + | "ID": 19, | "Name": "${input.RECORDS_READ}", | "Update": 21, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 21, - | "Name": "${output.WRITE_METHOD}", - | "Update": "Hadoop", - | "Internal": true, - | "Count Failed Values": true - | }, - | { - | "ID": 22, + | "ID": 20, | "Name": "${output.BYTES_WRITTEN}", | "Update": 1200, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 23, + | "ID": 21, | "Name": "${output.RECORDS_WRITTEN}", | "Update": 12, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 24, + | "ID": 22, | "Name": "$TEST_ACCUM", | "Update": 0, | "Internal": true, diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 71f337ce1f..7730823f94 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -630,7 +630,10 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"), // [SPARK-14617] Remove deprecated APIs in TaskMetrics ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$"), + // [SPARK-14628] Simplify task metrics by always tracking read/write metrics + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.readMethod"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.writeMethod") ) case v if v.startsWith("1.6") => Seq( |