diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-11-09 22:29:03 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2014-11-09 22:29:03 -0800 |
commit | 3c2cff4b9464f8d7535564fcd194631a8e5bb0a5 (patch) | |
tree | 87535991ff8b6564c6d52263786f02d3c0cbce45 /core | |
parent | f8e5732307dcb1482d9bcf1162a1090ef9a7b913 (diff) | |
download | spark-3c2cff4b9464f8d7535564fcd194631a8e5bb0a5.tar.gz spark-3c2cff4b9464f8d7535564fcd194631a8e5bb0a5.tar.bz2 spark-3c2cff4b9464f8d7535564fcd194631a8e5bb0a5.zip |
SPARK-3179. Add task OutputMetrics.
Author: Sandy Ryza <sandy@cloudera.com>
This patch had conflicts when merged, resolved by
Committer: Kay Ousterhout <kayousterhout@gmail.com>
Closes #2968 from sryza/sandy-spark-3179 and squashes the following commits:
dce4784 [Sandy Ryza] More review feedback
8d350d1 [Sandy Ryza] Fix test against Hadoop 2.5+
e7c74d0 [Sandy Ryza] More review feedback
6cff9c4 [Sandy Ryza] Review feedback
fb2dde0 [Sandy Ryza] SPARK-3179
Diffstat (limited to 'core')
16 files changed, 346 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e28eaad8a5..60ee115e39 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy +import java.lang.reflect.Method import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration @@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging { */ private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration) : Option[() => Long] = { - val qualifiedPath = path.getFileSystem(conf).makeQualified(path) - val scheme = qualifiedPath.toUri().getScheme() - val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) try { - val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) - val statisticsDataClass = - Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") - val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead") + val threadStats = getFileSystemThreadStatistics(path, conf) + val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead") val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum val baselineBytesRead = f() Some(() => f() - baselineBytesRead) @@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging { } } } + + /** + * Returns a function that can be called to find Hadoop FileSystem bytes written. If + * getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will + * return the bytes written on r since t. Reflection is required because thread-level FileSystem + * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). + * Returns None if the required method can't be found. + */ + private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration) + : Option[() => Long] = { + try { + val threadStats = getFileSystemThreadStatistics(path, conf) + val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten") + val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum + val baselineBytesWritten = f() + Some(() => f() - baselineBytesWritten) + } catch { + case e: NoSuchMethodException => { + logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e) + None + } + } + } + + private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = { + val qualifiedPath = path.getFileSystem(conf).makeQualified(path) + val scheme = qualifiedPath.toUri().getScheme() + val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) + stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + } + + private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { + val statisticsDataClass = + Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") + statisticsDataClass.getDeclaredMethod(methodName) + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 57bc2b40ce..51b5328cb4 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -83,6 +83,12 @@ class TaskMetrics extends Serializable { var inputMetrics: Option[InputMetrics] = None /** + * If this task writes data externally (e.g. to a distributed filesystem), metrics on how much + * data was written are stored here. + */ + var outputMetrics: Option[OutputMetrics] = None + + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here. * This includes read metrics aggregated over all the task's shuffle dependencies. */ @@ -159,6 +165,16 @@ object DataReadMethod extends Enumeration with Serializable { /** * :: DeveloperApi :: + * Method by which output data was written. + */ +@DeveloperApi +object DataWriteMethod extends Enumeration with Serializable { + type DataWriteMethod = Value + val Hadoop = Value +} + +/** + * :: DeveloperApi :: * Metrics about reading input data. */ @DeveloperApi @@ -171,6 +187,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { /** * :: DeveloperApi :: + * Metrics about writing output data. + */ +@DeveloperApi +case class OutputMetrics(writeMethod: DataWriteMethod.Value) { + /** + * Total bytes written + */ + var bytesWritten: Long = 0L +} + +/** + * :: DeveloperApi :: * Metrics pertaining to shuffle data read in a given task. */ @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 462f0d6268..8c2c959e73 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -28,7 +28,7 @@ import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} @@ -40,6 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{DataWriteMethod, OutputMetrics} import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -962,30 +963,40 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { + val config = wrappedConf.value // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" <split #> <attempt # = spark task #> */ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, attemptNumber) - val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) + val hadoopContext = newTaskAttemptContext(config, attemptId) val format = outfmt.newInstance format match { - case c: Configurable => c.setConf(wrappedConf.value) + case c: Configurable => c.setConf(config) case _ => () } val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) + + val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config) + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] try { + var recordsWritten = 0L while (iter.hasNext) { val pair = iter.next() writer.write(pair._1, pair._2) + + // Update bytes written metric every few records + maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) + recordsWritten += 1 } } finally { writer.close(hadoopContext) } committer.commitTask(hadoopContext) + bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } 1 } : Int @@ -1006,6 +1017,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def saveAsHadoopDataset(conf: JobConf) { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf + val wrappedConf = new SerializableWritable(hadoopConf) val outputFormatInstance = hadoopConf.getOutputFormat val keyClass = hadoopConf.getOutputKeyClass val valueClass = hadoopConf.getOutputValueClass @@ -1033,27 +1045,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.preSetup() val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { + val config = wrappedConf.value // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt + val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config) + writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() try { + var recordsWritten = 0L while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + + // Update bytes written metric every few records + maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) + recordsWritten += 1 } } finally { writer.close() } writer.commit() + bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } } self.context.runJob(self, writeToFile) writer.commitJob() } + private def initHadoopOutputMetrics(context: TaskContext, config: Configuration) + : (OutputMetrics, Option[() => Long]) = { + val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir")) + .map(new Path(_)) + .flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config)) + val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) + if (bytesWrittenCallback.isDefined) { + context.taskMetrics.outputMetrics = Some(outputMetrics) + } + (outputMetrics, bytesWrittenCallback) + } + + private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long], + outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { + if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 + && bytesWrittenCallback.isDefined) { + bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + } + } + /** * Return an RDD with the keys of each tuple. */ @@ -1070,3 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) } + +private[spark] object PairRDDFunctions { + val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256 +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 4e3d9de540..3bb54855ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -158,6 +158,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener " INPUT_BYTES=" + metrics.bytesRead case None => "" } + val outputMetrics = taskMetrics.outputMetrics match { + case Some(metrics) => + " OUTPUT_BYTES=" + metrics.bytesWritten + case None => "" + } val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match { case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + @@ -173,7 +178,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener " SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime case None => "" } - stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + + stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics + shuffleReadMetrics + writeMetrics) } diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 51dc08f668..6f446c5a95 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -29,6 +29,8 @@ private[spark] object ToolTips { val INPUT = "Bytes read from Hadoop or from Spark storage." + val OUTPUT = "Bytes written to Hadoop." + val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage." val SHUFFLE_READ = diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index ba97630f02..dd1c2b78c4 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -48,6 +48,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() val executorToInputBytes = HashMap[String, Long]() + val executorToOutputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() @@ -78,6 +79,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp executorToInputBytes(eid) = executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead } + metrics.outputMetrics.foreach { outputMetrics => + executorToOutputBytes(eid) = + executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten + } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index f0e43fbf70..fa0f96bff3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -45,6 +45,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr <th>Failed Tasks</th> <th>Succeeded Tasks</th> <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> + <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th> <th>Shuffle Spill (Memory)</th> @@ -77,6 +78,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr <td>{v.succeededTasks}</td> <td sorttable_customkey={v.inputBytes.toString}> {Utils.bytesToString(v.inputBytes)}</td> + <td sorttable_customkey={v.outputBytes.toString}> + {Utils.bytesToString(v.outputBytes)}</td> <td sorttable_customkey={v.shuffleRead.toString}> {Utils.bytesToString(v.shuffleRead)}</td> <td sorttable_customkey={v.shuffleWrite.toString}> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index e3223403c1..8bbde51e18 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -259,6 +259,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.inputBytes += inputBytesDelta execSummary.inputBytes += inputBytesDelta + val outputBytesDelta = + (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L) + - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L)) + stageData.outputBytes += outputBytesDelta + execSummary.outputBytes += outputBytesDelta + val diskSpillDelta = taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L) stageData.diskBytesSpilled += diskSpillDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 250bddbe2f..16bc3f6c18 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -57,6 +57,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables val hasAccumulators = accumulables.size > 0 val hasInput = stageData.inputBytes > 0 + val hasOutput = stageData.outputBytes > 0 val hasShuffleRead = stageData.shuffleReadBytes > 0 val hasShuffleWrite = stageData.shuffleWriteBytes > 0 val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 @@ -74,6 +75,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {Utils.bytesToString(stageData.inputBytes)} </li> }} + {if (hasOutput) { + <li> + <strong>Output: </strong> + {Utils.bytesToString(stageData.outputBytes)} + </li> + }} {if (hasShuffleRead) { <li> <strong>Shuffle read: </strong> @@ -162,6 +169,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ {if (hasInput) Seq(("Input", "")) else Nil} ++ + {if (hasOutput) Seq(("Output", "")) else Nil} ++ {if (hasShuffleRead) Seq(("Shuffle Read", "")) else Nil} ++ {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++ {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) @@ -172,7 +180,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val taskTable = UIUtils.listingTable( unzipped._1, - taskRow(hasAccumulators, hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), + taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite, + hasBytesSpilled), tasks, headerClasses = unzipped._2) // Excludes tasks which failed and have incomplete metrics @@ -260,6 +269,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } val inputQuantiles = <td>Input</td> +: getFormattedSizeQuantiles(inputSizes) + val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble + } + val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes) + val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } @@ -296,6 +310,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { </tr>, <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>, if (hasInput) <tr>{inputQuantiles}</tr> else Nil, + if (hasOutput) <tr>{outputQuantiles}</tr> else Nil, if (hasShuffleRead) <tr>{shuffleReadQuantiles}</tr> else Nil, if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil, if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, @@ -328,6 +343,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def taskRow( hasAccumulators: Boolean, hasInput: Boolean, + hasOutput: Boolean, hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { @@ -351,6 +367,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") .getOrElse("") + val maybeOutput = metrics.flatMap(_.outputMetrics) + val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("") + val outputReadable = maybeOutput + .map(m => s"${Utils.bytesToString(m.bytesWritten)}") + .getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") @@ -417,6 +439,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {inputReadable} </td> }} + {if (hasOutput) { + <td sorttable_customkey={outputSortable}> + {outputReadable} + </td> + }} {if (hasShuffleRead) { <td sorttable_customkey={shuffleReadSortable}> {shuffleReadReadable} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 3b4866e059..eae542df85 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -45,6 +45,7 @@ private[ui] class StageTableBase( <th>Duration</th> <th>Tasks: Succeeded/Total</th> <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> + <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> <th> <!-- Place the shuffle write tooltip on the left (rather than the default position @@ -151,6 +152,8 @@ private[ui] class StageTableBase( val inputRead = stageData.inputBytes val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else "" + val outputWrite = stageData.outputBytes + val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else "" val shuffleRead = stageData.shuffleReadBytes val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else "" val shuffleWrite = stageData.shuffleWriteBytes @@ -179,6 +182,7 @@ private[ui] class StageTableBase( stageData.numFailedTasks, s.numTasks)} </td> <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td> + <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td> <td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td> <td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td> } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index e2813f8eb5..2f7d618df5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -31,6 +31,7 @@ private[jobs] object UIData { var failedTasks : Int = 0 var succeededTasks : Int = 0 var inputBytes : Long = 0 + var outputBytes : Long = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 var memoryBytesSpilled : Long = 0 @@ -53,6 +54,7 @@ private[jobs] object UIData { var executorRunTime: Long = _ var inputBytes: Long = _ + var outputBytes: Long = _ var shuffleReadBytes: Long = _ var shuffleWriteBytes: Long = _ var memoryBytesSpilled: Long = _ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f15d0c8566..7e536edfe8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -26,9 +26,7 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ - -import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics, - ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.executor._ import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ @@ -232,6 +230,8 @@ private[spark] object JsonProtocol { taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) val inputMetrics = taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing) + val outputMetrics = + taskMetrics.outputMetrics.map(outputMetricsToJson).getOrElse(JNothing) val updatedBlocks = taskMetrics.updatedBlocks.map { blocks => JArray(blocks.toList.map { case (id, status) => @@ -250,6 +250,7 @@ private[spark] object JsonProtocol { ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ ("Input Metrics" -> inputMetrics) ~ + ("Output Metrics" -> outputMetrics) ~ ("Updated Blocks" -> updatedBlocks) } @@ -270,6 +271,11 @@ private[spark] object JsonProtocol { ("Bytes Read" -> inputMetrics.bytesRead) } + def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = { + ("Data Write Method" -> outputMetrics.writeMethod.toString) ~ + ("Bytes Written" -> outputMetrics.bytesWritten) + } + def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) val json: JObject = taskEndReason match { @@ -579,6 +585,8 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) metrics.inputMetrics = Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson) + metrics.outputMetrics = + Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson) metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value => value.extract[List[JValue]].map { block => @@ -613,6 +621,13 @@ private[spark] object JsonProtocol { metrics } + def outputMetricsFromJson(json: JValue): OutputMetrics = { + val metrics = new OutputMetrics( + DataWriteMethod.withName((json \ "Data Write Method").extract[String])) + metrics.bytesWritten = (json \ "Bytes Written").extract[Long] + metrics + } + def taskEndReasonFromJson(json: JValue): TaskEndReason = { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 48c386ba04..ca226fd4e6 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -17,16 +17,21 @@ package org.apache.spark.metrics -import org.scalatest.FunSuite +import java.io.{FileWriter, PrintWriter, File} import org.apache.spark.SharedSparkContext +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} -import scala.collection.mutable.ArrayBuffer +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers -import java.io.{FileWriter, PrintWriter, File} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} + +import scala.collection.mutable.ArrayBuffer -class InputMetricsSuite extends FunSuite with SharedSparkContext { +class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with ShouldMatchers { test("input metrics when reading text file with single split") { val file = new File(getClass.getSimpleName + ".txt") val pw = new PrintWriter(new FileWriter(file)) @@ -73,4 +78,32 @@ class InputMetricsSuite extends FunSuite with SharedSparkContext { assert(taskBytesRead.length == 2) assert(taskBytesRead.sum >= file.length()) } + + test("output metrics when writing text file") { + val fs = FileSystem.getLocal(new Configuration()) + val outPath = new Path(fs.getWorkingDirectory, "outdir") + + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf).isDefined) { + val taskBytesWritten = new ArrayBuffer[Long]() + sc.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + taskBytesWritten += taskEnd.taskMetrics.outputMetrics.get.bytesWritten + } + }) + + val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + + try { + rdd.saveAsTextFile(outPath.toString) + sc.listenerBus.waitUntilEmpty(500) + assert(taskBytesWritten.length == 2) + val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS") + taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) => + assert(bytes >= fileStatus.getLen) + } + } finally { + fs.delete(outPath, true) + } + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index ab35e8edc4..abe0dc35b0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -252,6 +252,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers taskMetrics.resultSize should be > (0l) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { taskMetrics.inputMetrics should not be ('defined) + taskMetrics.outputMetrics should not be ('defined) taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 2608ad4b32..7c102cc7f4 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -159,6 +159,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) inputMetrics.bytesRead = base + 7 + val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) + taskMetrics.outputMetrics = Some(outputMetrics) + outputMetrics.bytesWritten = base + 8 taskMetrics } @@ -193,6 +196,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(stage1Data.memoryBytesSpilled == 206) assert(stage0Data.inputBytes == 114) assert(stage1Data.inputBytes == 207) + assert(stage0Data.outputBytes == 116) + assert(stage1Data.outputBytes == 208) assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get .totalBlocksFetched == 2) assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get @@ -221,6 +226,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(stage1Data.memoryBytesSpilled == 612) assert(stage0Data.inputBytes == 414) assert(stage1Data.inputBytes == 614) + assert(stage0Data.outputBytes == 416) + assert(stage1Data.outputBytes == 616) assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get .totalBlocksFetched == 302) assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 39e69851e7..50f42054b9 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -21,9 +21,6 @@ import java.util.Properties import scala.collection.Map -import org.json4s.DefaultFormats -import org.json4s.JsonDSL._ -import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.scalatest.FunSuite @@ -43,10 +40,13 @@ class JsonProtocolSuite extends FunSuite { SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), - makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false, hasOutput = false)) val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), - makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = false)) + val taskEndWithOutput = SparkListenerTaskEnd(1, 0, "ResultTask", Success, + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -69,6 +69,7 @@ class JsonProtocolSuite extends FunSuite { testEvent(taskGettingResult, taskGettingResultJsonString) testEvent(taskEnd, taskEndJsonString) testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString) + testEvent(taskEndWithOutput, taskEndWithOutputJsonString) testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) @@ -83,7 +84,8 @@ class JsonProtocolSuite extends FunSuite { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) + testTaskMetrics(makeTaskMetrics( + 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) // StorageLevel @@ -154,7 +156,7 @@ class JsonProtocolSuite extends FunSuite { test("InputMetrics backward compatibility") { // InputMetrics were added after 1.0.1. - val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false) assert(metrics.inputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } @@ -162,6 +164,16 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("OutputMetrics backward compatibility") { + // OutputMetrics were added after 1.1 + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true) + assert(metrics.outputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.outputMetrics.isEmpty) + } + test("BlockManager events backward compatibility") { // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. val blockManagerAdded = SparkListenerBlockManagerAdded(1L, @@ -581,9 +593,9 @@ class JsonProtocolSuite extends FunSuite { d: Long, e: Int, f: Int, - hasHadoopInput: Boolean) = { + hasHadoopInput: Boolean, + hasOutput: Boolean) = { val t = new TaskMetrics - val sw = new ShuffleWriteMetrics t.hostname = "localhost" t.executorDeserializeTime = a t.executorRunTime = b @@ -604,9 +616,16 @@ class JsonProtocolSuite extends FunSuite { sr.remoteBlocksFetched = f t.setShuffleReadMetrics(Some(sr)) } - sw.shuffleBytesWritten = a + b + c - sw.shuffleWriteTime = b + c + d - t.shuffleWriteMetrics = Some(sw) + if (hasOutput) { + val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) + outputMetrics.bytesWritten = a + b + c + t.outputMetrics = Some(outputMetrics) + } else { + val sw = new ShuffleWriteMetrics + sw.shuffleBytesWritten = a + b + c + sw.shuffleWriteTime = b + c + d + t.shuffleWriteMetrics = Some(sw) + } // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i)) @@ -946,6 +965,87 @@ class JsonProtocolSuite extends FunSuite { |} """ + private val taskEndWithOutputJsonString = + """ + |{ + | "Event": "SparkListenerTaskEnd", + | "Stage ID": 1, + | "Stage Attempt ID": 0, + | "Task Type": "ResultTask", + | "Task End Reason": { + | "Reason": "Success" + | }, + | "Task Info": { + | "Task ID": 123, + | "Index": 234, + | "Attempt": 67, + | "Launch Time": 345, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] + | }, + | "Task Metrics": { + | "Host Name": "localhost", + | "Executor Deserialize Time": 300, + | "Executor Run Time": 400, + | "Result Size": 500, + | "JVM GC Time": 600, + | "Result Serialization Time": 700, + | "Memory Bytes Spilled": 800, + | "Disk Bytes Spilled": 0, + | "Input Metrics": { + | "Data Read Method": "Hadoop", + | "Bytes Read": 2100 + | }, + | "Output Metrics": { + | "Data Write Method": "Hadoop", + | "Bytes Written": 1200 + | }, + | "Updated Blocks": [ + | { + | "Block ID": "rdd_0_0", + | "Status": { + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": false, + | "Replication": 2 + | }, + | "Memory Size": 0, + | "Tachyon Size": 0, + | "Disk Size": 0 + | } + | } + | ] + | } + |} + """ + private val jobStartJsonString = """ |{ |