aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-11-09 22:29:03 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2014-11-09 22:29:03 -0800
commit3c2cff4b9464f8d7535564fcd194631a8e5bb0a5 (patch)
tree87535991ff8b6564c6d52263786f02d3c0cbce45
parentf8e5732307dcb1482d9bcf1162a1090ef9a7b913 (diff)
downloadspark-3c2cff4b9464f8d7535564fcd194631a8e5bb0a5.tar.gz
spark-3c2cff4b9464f8d7535564fcd194631a8e5bb0a5.tar.bz2
spark-3c2cff4b9464f8d7535564fcd194631a8e5bb0a5.zip
SPARK-3179. Add task OutputMetrics.
Author: Sandy Ryza <sandy@cloudera.com> This patch had conflicts when merged, resolved by Committer: Kay Ousterhout <kayousterhout@gmail.com> Closes #2968 from sryza/sandy-spark-3179 and squashes the following commits: dce4784 [Sandy Ryza] More review feedback 8d350d1 [Sandy Ryza] Fix test against Hadoop 2.5+ e7c74d0 [Sandy Ryza] More review feedback 6cff9c4 [Sandy Ryza] Review feedback fb2dde0 [Sandy Ryza] SPARK-3179
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala (renamed from core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala)41
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala124
16 files changed, 346 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e28eaad8a5..60ee115e39 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy
+import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.conf.Configuration
@@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
- val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
- val scheme = qualifiedPath.toUri().getScheme()
- val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
try {
- val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
- val statisticsDataClass =
- Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
- val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
+ val threadStats = getFileSystemThreadStatistics(path, conf)
+ val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
@@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
}
}
}
+
+ /**
+ * Returns a function that can be called to find Hadoop FileSystem bytes written. If
+ * getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
+ * return the bytes written on r since t. Reflection is required because thread-level FileSystem
+ * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
+ * Returns None if the required method can't be found.
+ */
+ private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
+ : Option[() => Long] = {
+ try {
+ val threadStats = getFileSystemThreadStatistics(path, conf)
+ val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
+ val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
+ val baselineBytesWritten = f()
+ Some(() => f() - baselineBytesWritten)
+ } catch {
+ case e: NoSuchMethodException => {
+ logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
+ None
+ }
+ }
+ }
+
+ private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
+ val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
+ val scheme = qualifiedPath.toUri().getScheme()
+ val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
+ stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+ }
+
+ private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
+ val statisticsDataClass =
+ Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
+ statisticsDataClass.getDeclaredMethod(methodName)
+ }
}
object SparkHadoopUtil {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 57bc2b40ce..51b5328cb4 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -83,6 +83,12 @@ class TaskMetrics extends Serializable {
var inputMetrics: Option[InputMetrics] = None
/**
+ * If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
+ * data was written are stored here.
+ */
+ var outputMetrics: Option[OutputMetrics] = None
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
* This includes read metrics aggregated over all the task's shuffle dependencies.
*/
@@ -159,6 +165,16 @@ object DataReadMethod extends Enumeration with Serializable {
/**
* :: DeveloperApi ::
+ * Method by which output data was written.
+ */
+@DeveloperApi
+object DataWriteMethod extends Enumeration with Serializable {
+ type DataWriteMethod = Value
+ val Hadoop = Value
+}
+
+/**
+ * :: DeveloperApi ::
* Metrics about reading input data.
*/
@DeveloperApi
@@ -171,6 +187,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
/**
* :: DeveloperApi ::
+ * Metrics about writing output data.
+ */
+@DeveloperApi
+case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+ /**
+ * Total bytes written
+ */
+ var bytesWritten: Long = 0L
+}
+
+/**
+ * :: DeveloperApi ::
* Metrics pertaining to shuffle data read in a given task.
*/
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 462f0d6268..8c2c959e73 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -28,7 +28,7 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
@@ -40,6 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
@@ -962,30 +963,40 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
+ val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
- val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+ val hadoopContext = newTaskAttemptContext(config, attemptId)
val format = outfmt.newInstance
format match {
- case c: Configurable => c.setConf(wrappedConf.value)
+ case c: Configurable => c.setConf(config)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
+
+ val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
+ var recordsWritten = 0L
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
+
+ // Update bytes written metric every few records
+ maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
+ recordsWritten += 1
}
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
+ bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
1
} : Int
@@ -1006,6 +1017,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf) {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
+ val wrappedConf = new SerializableWritable(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
@@ -1033,27 +1045,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
+ val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+ val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
try {
+ var recordsWritten = 0L
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+
+ // Update bytes written metric every few records
+ maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
+ recordsWritten += 1
}
} finally {
writer.close()
}
writer.commit()
+ bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
}
self.context.runJob(self, writeToFile)
writer.commitJob()
}
+ private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
+ : (OutputMetrics, Option[() => Long]) = {
+ val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
+ .map(new Path(_))
+ .flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
+ val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+ if (bytesWrittenCallback.isDefined) {
+ context.taskMetrics.outputMetrics = Some(outputMetrics)
+ }
+ (outputMetrics, bytesWrittenCallback)
+ }
+
+ private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
+ outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
+ if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
+ && bytesWrittenCallback.isDefined) {
+ bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+ }
+ }
+
/**
* Return an RDD with the keys of each tuple.
*/
@@ -1070,3 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
}
+
+private[spark] object PairRDDFunctions {
+ val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 4e3d9de540..3bb54855ba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -158,6 +158,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" INPUT_BYTES=" + metrics.bytesRead
case None => ""
}
+ val outputMetrics = taskMetrics.outputMetrics match {
+ case Some(metrics) =>
+ " OUTPUT_BYTES=" + metrics.bytesWritten
+ case None => ""
+ }
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -173,7 +178,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime
case None => ""
}
- stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics +
+ stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics +
shuffleReadMetrics + writeMetrics)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 51dc08f668..6f446c5a95 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -29,6 +29,8 @@ private[spark] object ToolTips {
val INPUT = "Bytes read from Hadoop or from Spark storage."
+ val OUTPUT = "Bytes written to Hadoop."
+
val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
val SHUFFLE_READ =
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index ba97630f02..dd1c2b78c4 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,6 +48,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
+ val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
@@ -78,6 +79,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
}
+ metrics.outputMetrics.foreach { outputMetrics =>
+ executorToOutputBytes(eid) =
+ executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
+ }
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index f0e43fbf70..fa0f96bff3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -45,6 +45,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
+ <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
<th>Shuffle Spill (Memory)</th>
@@ -77,6 +78,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<td>{v.succeededTasks}</td>
<td sorttable_customkey={v.inputBytes.toString}>
{Utils.bytesToString(v.inputBytes)}</td>
+ <td sorttable_customkey={v.outputBytes.toString}>
+ {Utils.bytesToString(v.outputBytes)}</td>
<td sorttable_customkey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customkey={v.shuffleWrite.toString}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index e3223403c1..8bbde51e18 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -259,6 +259,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta
+ val outputBytesDelta =
+ (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
+ - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
+ stageData.outputBytes += outputBytesDelta
+ execSummary.outputBytes += outputBytesDelta
+
val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 250bddbe2f..16bc3f6c18 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -57,6 +57,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
val hasAccumulators = accumulables.size > 0
val hasInput = stageData.inputBytes > 0
+ val hasOutput = stageData.outputBytes > 0
val hasShuffleRead = stageData.shuffleReadBytes > 0
val hasShuffleWrite = stageData.shuffleWriteBytes > 0
val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
@@ -74,6 +75,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{Utils.bytesToString(stageData.inputBytes)}
</li>
}}
+ {if (hasOutput) {
+ <li>
+ <strong>Output: </strong>
+ {Utils.bytesToString(stageData.outputBytes)}
+ </li>
+ }}
{if (hasShuffleRead) {
<li>
<strong>Shuffle read: </strong>
@@ -162,6 +169,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
{if (hasInput) Seq(("Input", "")) else Nil} ++
+ {if (hasOutput) Seq(("Output", "")) else Nil} ++
{if (hasShuffleRead) Seq(("Shuffle Read", "")) else Nil} ++
{if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++
{if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
@@ -172,7 +180,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val taskTable = UIUtils.listingTable(
unzipped._1,
- taskRow(hasAccumulators, hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled),
+ taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite,
+ hasBytesSpilled),
tasks,
headerClasses = unzipped._2)
// Excludes tasks which failed and have incomplete metrics
@@ -260,6 +269,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
val inputQuantiles = <td>Input</td> +: getFormattedSizeQuantiles(inputSizes)
+ val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ }
+ val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes)
+
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
@@ -296,6 +310,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</tr>,
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
if (hasInput) <tr>{inputQuantiles}</tr> else Nil,
+ if (hasOutput) <tr>{outputQuantiles}</tr> else Nil,
if (hasShuffleRead) <tr>{shuffleReadQuantiles}</tr> else Nil,
if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
@@ -328,6 +343,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def taskRow(
hasAccumulators: Boolean,
hasInput: Boolean,
+ hasOutput: Boolean,
hasShuffleRead: Boolean,
hasShuffleWrite: Boolean,
hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
@@ -351,6 +367,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
.map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
.getOrElse("")
+ val maybeOutput = metrics.flatMap(_.outputMetrics)
+ val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
+ val outputReadable = maybeOutput
+ .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
+ .getOrElse("")
+
val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -417,6 +439,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{inputReadable}
</td>
}}
+ {if (hasOutput) {
+ <td sorttable_customkey={outputSortable}>
+ {outputReadable}
+ </td>
+ }}
{if (hasShuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 3b4866e059..eae542df85 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -45,6 +45,7 @@ private[ui] class StageTableBase(
<th>Duration</th>
<th>Tasks: Succeeded/Total</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
+ <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
<!-- Place the shuffle write tooltip on the left (rather than the default position
@@ -151,6 +152,8 @@ private[ui] class StageTableBase(
val inputRead = stageData.inputBytes
val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
+ val outputWrite = stageData.outputBytes
+ val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
val shuffleRead = stageData.shuffleReadBytes
val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
val shuffleWrite = stageData.shuffleWriteBytes
@@ -179,6 +182,7 @@ private[ui] class StageTableBase(
stageData.numFailedTasks, s.numTasks)}
</td>
<td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
+ <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td>
<td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
<td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index e2813f8eb5..2f7d618df5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -31,6 +31,7 @@ private[jobs] object UIData {
var failedTasks : Int = 0
var succeededTasks : Int = 0
var inputBytes : Long = 0
+ var outputBytes : Long = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
var memoryBytesSpilled : Long = 0
@@ -53,6 +54,7 @@ private[jobs] object UIData {
var executorRunTime: Long = _
var inputBytes: Long = _
+ var outputBytes: Long = _
var shuffleReadBytes: Long = _
var shuffleWriteBytes: Long = _
var memoryBytesSpilled: Long = _
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f15d0c8566..7e536edfe8 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,9 +26,7 @@ import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
-
-import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
- ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
@@ -232,6 +230,8 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
val inputMetrics =
taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
+ val outputMetrics =
+ taskMetrics.outputMetrics.map(outputMetricsToJson).getOrElse(JNothing)
val updatedBlocks =
taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
@@ -250,6 +250,7 @@ private[spark] object JsonProtocol {
("Shuffle Read Metrics" -> shuffleReadMetrics) ~
("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
("Input Metrics" -> inputMetrics) ~
+ ("Output Metrics" -> outputMetrics) ~
("Updated Blocks" -> updatedBlocks)
}
@@ -270,6 +271,11 @@ private[spark] object JsonProtocol {
("Bytes Read" -> inputMetrics.bytesRead)
}
+ def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
+ ("Data Write Method" -> outputMetrics.writeMethod.toString) ~
+ ("Bytes Written" -> outputMetrics.bytesWritten)
+ }
+
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
val json: JObject = taskEndReason match {
@@ -579,6 +585,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
+ metrics.outputMetrics =
+ Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
@@ -613,6 +621,13 @@ private[spark] object JsonProtocol {
metrics
}
+ def outputMetricsFromJson(json: JValue): OutputMetrics = {
+ val metrics = new OutputMetrics(
+ DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
+ metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
+ metrics
+ }
+
def taskEndReasonFromJson(json: JValue): TaskEndReason = {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 48c386ba04..ca226fd4e6 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -17,16 +17,21 @@
package org.apache.spark.metrics
-import org.scalatest.FunSuite
+import java.io.{FileWriter, PrintWriter, File}
import org.apache.spark.SharedSparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
-import scala.collection.mutable.ArrayBuffer
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
-import java.io.{FileWriter, PrintWriter, File}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+import scala.collection.mutable.ArrayBuffer
-class InputMetricsSuite extends FunSuite with SharedSparkContext {
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with ShouldMatchers {
test("input metrics when reading text file with single split") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
@@ -73,4 +78,32 @@ class InputMetricsSuite extends FunSuite with SharedSparkContext {
assert(taskBytesRead.length == 2)
assert(taskBytesRead.sum >= file.length())
}
+
+ test("output metrics when writing text file") {
+ val fs = FileSystem.getLocal(new Configuration())
+ val outPath = new Path(fs.getWorkingDirectory, "outdir")
+
+ if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf).isDefined) {
+ val taskBytesWritten = new ArrayBuffer[Long]()
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ taskBytesWritten += taskEnd.taskMetrics.outputMetrics.get.bytesWritten
+ }
+ })
+
+ val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
+
+ try {
+ rdd.saveAsTextFile(outPath.toString)
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(taskBytesWritten.length == 2)
+ val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
+ taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
+ assert(bytes >= fileStatus.getLen)
+ }
+ } finally {
+ fs.delete(outPath, true)
+ }
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index ab35e8edc4..abe0dc35b0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -252,6 +252,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
taskMetrics.resultSize should be > (0l)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
taskMetrics.inputMetrics should not be ('defined)
+ taskMetrics.outputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 2608ad4b32..7c102cc7f4 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -159,6 +159,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.bytesRead = base + 7
+ val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+ taskMetrics.outputMetrics = Some(outputMetrics)
+ outputMetrics.bytesWritten = base + 8
taskMetrics
}
@@ -193,6 +196,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.memoryBytesSpilled == 206)
assert(stage0Data.inputBytes == 114)
assert(stage1Data.inputBytes == 207)
+ assert(stage0Data.outputBytes == 116)
+ assert(stage1Data.outputBytes == 208)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -221,6 +226,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.memoryBytesSpilled == 612)
assert(stage0Data.inputBytes == 414)
assert(stage1Data.inputBytes == 614)
+ assert(stage0Data.outputBytes == 416)
+ assert(stage1Data.outputBytes == 616)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 39e69851e7..50f42054b9 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -21,9 +21,6 @@ import java.util.Properties
import scala.collection.Map
-import org.json4s.DefaultFormats
-import org.json4s.JsonDSL._
-import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.scalatest.FunSuite
@@ -43,10 +40,13 @@ class JsonProtocolSuite extends FunSuite {
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
makeTaskInfo(123L, 234, 67, 345L, false),
- makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false, hasOutput = false))
val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
makeTaskInfo(123L, 234, 67, 345L, false),
- makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = false))
+ val taskEndWithOutput = SparkListenerTaskEnd(1, 0, "ResultTask", Success,
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -69,6 +69,7 @@ class JsonProtocolSuite extends FunSuite {
testEvent(taskGettingResult, taskGettingResultJsonString)
testEvent(taskEnd, taskEndJsonString)
testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
+ testEvent(taskEndWithOutput, taskEndWithOutputJsonString)
testEvent(jobStart, jobStartJsonString)
testEvent(jobEnd, jobEndJsonString)
testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -83,7 +84,8 @@ class JsonProtocolSuite extends FunSuite {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
- testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
+ testTaskMetrics(makeTaskMetrics(
+ 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
// StorageLevel
@@ -154,7 +156,7 @@ class JsonProtocolSuite extends FunSuite {
test("InputMetrics backward compatibility") {
// InputMetrics were added after 1.0.1.
- val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false)
assert(metrics.inputMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
@@ -162,6 +164,16 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}
+ test("OutputMetrics backward compatibility") {
+ // OutputMetrics were added after 1.1
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
+ assert(metrics.outputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.outputMetrics.isEmpty)
+ }
+
test("BlockManager events backward compatibility") {
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
@@ -581,9 +593,9 @@ class JsonProtocolSuite extends FunSuite {
d: Long,
e: Int,
f: Int,
- hasHadoopInput: Boolean) = {
+ hasHadoopInput: Boolean,
+ hasOutput: Boolean) = {
val t = new TaskMetrics
- val sw = new ShuffleWriteMetrics
t.hostname = "localhost"
t.executorDeserializeTime = a
t.executorRunTime = b
@@ -604,9 +616,16 @@ class JsonProtocolSuite extends FunSuite {
sr.remoteBlocksFetched = f
t.setShuffleReadMetrics(Some(sr))
}
- sw.shuffleBytesWritten = a + b + c
- sw.shuffleWriteTime = b + c + d
- t.shuffleWriteMetrics = Some(sw)
+ if (hasOutput) {
+ val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+ outputMetrics.bytesWritten = a + b + c
+ t.outputMetrics = Some(outputMetrics)
+ } else {
+ val sw = new ShuffleWriteMetrics
+ sw.shuffleBytesWritten = a + b + c
+ sw.shuffleWriteTime = b + c + d
+ t.shuffleWriteMetrics = Some(sw)
+ }
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
(RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i))
@@ -946,6 +965,87 @@ class JsonProtocolSuite extends FunSuite {
|}
"""
+ private val taskEndWithOutputJsonString =
+ """
+ |{
+ | "Event": "SparkListenerTaskEnd",
+ | "Stage ID": 1,
+ | "Stage Attempt ID": 0,
+ | "Task Type": "ResultTask",
+ | "Task End Reason": {
+ | "Reason": "Success"
+ | },
+ | "Task Info": {
+ | "Task ID": 123,
+ | "Index": 234,
+ | "Attempt": 67,
+ | "Launch Time": 345,
+ | "Executor ID": "executor",
+ | "Host": "your kind sir",
+ | "Locality": "NODE_LOCAL",
+ | "Speculative": false,
+ | "Getting Result Time": 0,
+ | "Finish Time": 0,
+ | "Failed": false,
+ | "Accumulables": [
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "Accumulable3",
+ | "Update": "delta3",
+ | "Value": "val3"
+ | }
+ | ]
+ | },
+ | "Task Metrics": {
+ | "Host Name": "localhost",
+ | "Executor Deserialize Time": 300,
+ | "Executor Run Time": 400,
+ | "Result Size": 500,
+ | "JVM GC Time": 600,
+ | "Result Serialization Time": 700,
+ | "Memory Bytes Spilled": 800,
+ | "Disk Bytes Spilled": 0,
+ | "Input Metrics": {
+ | "Data Read Method": "Hadoop",
+ | "Bytes Read": 2100
+ | },
+ | "Output Metrics": {
+ | "Data Write Method": "Hadoop",
+ | "Bytes Written": 1200
+ | },
+ | "Updated Blocks": [
+ | {
+ | "Block ID": "rdd_0_0",
+ | "Status": {
+ | "Storage Level": {
+ | "Use Disk": true,
+ | "Use Memory": true,
+ | "Use Tachyon": false,
+ | "Deserialized": false,
+ | "Replication": 2
+ | },
+ | "Memory Size": 0,
+ | "Tachyon Size": 0,
+ | "Disk Size": 0
+ | }
+ | }
+ | ]
+ | }
+ |}
+ """
+
private val jobStartJsonString =
"""
|{