aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java3
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java3
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java4
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala180
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala165
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala1
27 files changed, 281 insertions, 246 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a06dc1ce91..dc4f289ae7 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -114,8 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
- this.writeMetrics = new ShuffleWriteMetrics();
- taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+ this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.shuffleBlockResolver = shuffleBlockResolver;
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index c8cc705697..d3d79a27ea 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -119,8 +119,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
this.partitioner = dep.partitioner();
- this.writeMetrics = new ShuffleWriteMetrics();
- taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+ this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 68dc0c6d41..a6edc1ad3f 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -122,9 +122,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- // TODO: metrics tracking + integration with shuffle write metrics
- // need to connect the write metrics to task metrics so we count the spill IO somewhere.
- this.writeMetrics = new ShuffleWriteMetrics();
+ this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index d92d8b0eef..fa8e2b9538 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -43,8 +43,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
- val existingMetrics = context.taskMetrics
- .getInputMetricsForReadMethod(blockResult.readMethod)
+ val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
@@ -66,11 +65,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
-
- // Otherwise, cache the values
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
-
} finally {
loading.synchronized {
loading.remove(key)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9b14184364..75d7e34d60 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -425,7 +425,7 @@ private[spark] class Executor(
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
- metrics.updateShuffleReadMetrics()
+ metrics.mergeShuffleReadMetrics()
metrics.updateInputMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
metrics.updateAccumulators()
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index ce1fcbff71..32ef5a9b56 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -102,15 +102,38 @@ class TaskMetrics extends Serializable {
private[spark] def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value
private[spark] def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value
- /**
- * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
- * are stored here.
- */
private var _inputMetrics: Option[InputMetrics] = None
+ /**
+ * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
+ * data, defined only in tasks with input.
+ */
def inputMetrics: Option[InputMetrics] = _inputMetrics
/**
+ * Get or create a new [[InputMetrics]] associated with this task.
+ */
+ private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = {
+ synchronized {
+ val metrics = _inputMetrics.getOrElse {
+ val metrics = new InputMetrics(readMethod)
+ _inputMetrics = Some(metrics)
+ metrics
+ }
+ // If there already exists an InputMetric with the same read method, we can just return
+ // that one. Otherwise, if the read method is different from the one previously seen by
+ // this task, we return a new dummy one to avoid clobbering the values of the old metrics.
+ // In the future we should try to store input metrics from all different read methods at
+ // the same time (SPARK-5225).
+ if (metrics.readMethod == readMethod) {
+ metrics
+ } else {
+ new InputMetrics(readMethod)
+ }
+ }
+ }
+
+ /**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
* executors
*/
@@ -118,18 +141,37 @@ class TaskMetrics extends Serializable {
_inputMetrics = inputMetrics
}
+ private var _outputMetrics: Option[OutputMetrics] = None
+
/**
- * If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
- * data was written are stored here.
+ * Metrics related to writing data externally (e.g. to a distributed filesystem),
+ * defined only in tasks with output.
*/
- var outputMetrics: Option[OutputMetrics] = None
+ def outputMetrics: Option[OutputMetrics] = _outputMetrics
+
+ @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
+ def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
+ _outputMetrics = om
+ }
/**
- * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
- * This includes read metrics aggregated over all the task's shuffle dependencies.
+ * Get or create a new [[OutputMetrics]] associated with this task.
*/
+ private[spark] def registerOutputMetrics(
+ writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized {
+ _outputMetrics.getOrElse {
+ val metrics = new OutputMetrics(writeMethod)
+ _outputMetrics = Some(metrics)
+ metrics
+ }
+ }
+
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+ /**
+ * Metrics related to shuffle read aggregated across all shuffle dependencies.
+ * This is defined only if there are shuffle dependencies in this task.
+ */
def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics
/**
@@ -141,66 +183,35 @@ class TaskMetrics extends Serializable {
}
/**
- * ShuffleReadMetrics per dependency for collecting independently while task is in progress.
- */
- @transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] =
- new ArrayBuffer[ShuffleReadMetrics]()
-
- /**
- * If this task writes to shuffle output, metrics on the written shuffle data will be collected
- * here
- */
- var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
-
- /**
- * Storage statuses of any blocks that have been updated as a result of this task.
- */
- var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
-
- /**
+ * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
+ *
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
- * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
- * dependency, and merge these metrics before reporting them to the driver. This method returns
- * a ShuffleReadMetrics for a dependency and registers it for merging later.
- */
- private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
- val readMetrics = new ShuffleReadMetrics()
- depsShuffleReadMetrics += readMetrics
- readMetrics
- }
+ * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
+ * each dependency and merge these metrics before reporting them to the driver.
+ */
+ @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]
/**
- * Returns the input metrics object that the task should use. Currently, if
- * there exists an input metric with the same readMethod, we return that one
- * so the caller can accumulate bytes read. If the readMethod is different
- * than previously seen by this task, we return a new InputMetric but don't
- * record it.
+ * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency.
*
- * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
- * we can store all the different inputMetrics (one per readMethod).
+ * All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which
+ * merges the temporary values synchronously. Otherwise, all temporary data collected will
+ * be lost.
*/
- private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): InputMetrics = {
- synchronized {
- _inputMetrics match {
- case None =>
- val metrics = new InputMetrics(readMethod)
- _inputMetrics = Some(metrics)
- metrics
- case Some(metrics @ InputMetrics(method)) if method == readMethod =>
- metrics
- case Some(InputMetrics(method)) =>
- new InputMetrics(readMethod)
- }
- }
+ private[spark] def registerTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
+ val readMetrics = new ShuffleReadMetrics
+ tempShuffleReadMetrics += readMetrics
+ readMetrics
}
/**
- * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
+ * Merge values across all temporary [[ShuffleReadMetrics]] into `_shuffleReadMetrics`.
+ * This is expected to be called on executor heartbeat and at the end of a task.
*/
- private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
- if (!depsShuffleReadMetrics.isEmpty) {
- val merged = new ShuffleReadMetrics()
- for (depMetrics <- depsShuffleReadMetrics) {
+ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
+ if (tempShuffleReadMetrics.nonEmpty) {
+ val merged = new ShuffleReadMetrics
+ for (depMetrics <- tempShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
@@ -212,6 +223,55 @@ class TaskMetrics extends Serializable {
}
}
+ private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
+
+ /**
+ * Metrics related to shuffle write, defined only in shuffle map stages.
+ */
+ def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
+
+ @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
+ def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
+ _shuffleWriteMetrics = swm
+ }
+
+ /**
+ * Get or create a new [[ShuffleWriteMetrics]] associated with this task.
+ */
+ private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized {
+ _shuffleWriteMetrics.getOrElse {
+ val metrics = new ShuffleWriteMetrics
+ _shuffleWriteMetrics = Some(metrics)
+ metrics
+ }
+ }
+
+ private var _updatedBlockStatuses: Seq[(BlockId, BlockStatus)] =
+ Seq.empty[(BlockId, BlockStatus)]
+
+ /**
+ * Storage statuses of any blocks that have been updated as a result of this task.
+ */
+ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses
+
+ @deprecated("setting updated blocks is for internal use only", "2.0.0")
+ def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
+ _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)])
+ }
+
+ private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
+ _updatedBlockStatuses ++= v
+ }
+
+ private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
+ _updatedBlockStatuses = v
+ }
+
+ @deprecated("use updatedBlockStatuses instead", "2.0.0")
+ def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
+ if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None
+ }
+
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a7a6e0b8a9..a79ab86d49 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -212,7 +212,7 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
// Sets the thread local variable for the file's name
split.inputSplit.value match {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 7a11978304..5cc9c81cc6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -129,8 +129,7 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf
- val inputMetrics = context.taskMetrics
- .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 16a856f594..33f2f0b44f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1092,7 +1092,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
- val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
+ val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
+ initHadoopOutputMetrics(context)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]]
require(writer != null, "Unable to obtain RecordWriter")
@@ -1103,15 +1104,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.write(pair._1, pair._2)
// Update bytes written metric every few records
- maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
+ maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
} {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
- outputMetrics.setRecordsWritten(recordsWritten)
+ outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
+ om.setBytesWritten(callback())
+ om.setRecordsWritten(recordsWritten)
+ }
1
} : Int
@@ -1177,7 +1180,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
- val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
+ val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
+ initHadoopOutputMetrics(context)
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
@@ -1189,35 +1193,43 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
// Update bytes written metric every few records
- maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
+ maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten)
recordsWritten += 1
}
} {
writer.close()
}
writer.commit()
- bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
- outputMetrics.setRecordsWritten(recordsWritten)
+ outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
+ om.setBytesWritten(callback())
+ om.setRecordsWritten(recordsWritten)
+ }
}
self.context.runJob(self, writeToFile)
writer.commitJob()
}
- private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = {
+ // TODO: these don't seem like the right abstractions.
+ // We should abstract the duplicate code in a less awkward way.
+
+ // return type: (output metrics, bytes written callback), defined only if the latter is defined
+ private def initHadoopOutputMetrics(
+ context: TaskContext): Option[(OutputMetrics, () => Long)] = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
- val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
- if (bytesWrittenCallback.isDefined) {
- context.taskMetrics.outputMetrics = Some(outputMetrics)
+ bytesWrittenCallback.map { b =>
+ (context.taskMetrics().registerOutputMetrics(DataWriteMethod.Hadoop), b)
}
- (outputMetrics, bytesWrittenCallback)
}
- private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
- outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
+ private def maybeUpdateOutputMetrics(
+ outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
+ recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
- bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
- outputMetrics.setRecordsWritten(recordsWritten)
+ outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
+ om.setBytesWritten(callback())
+ om.setRecordsWritten(recordsWritten)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index b0abda4a81..a57e5b0bfb 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -65,13 +65,13 @@ private[spark] class BlockStoreShuffleReader[K, C](
}
// Update the context task metrics for each record read.
- val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
+ val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map(record => {
readMetrics.incRecordsRead(1)
record
}),
- context.taskMetrics().updateShuffleReadMetrics())
+ context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 412bf70000..28bcced901 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -42,8 +42,7 @@ private[spark] class HashShuffleWriter[K, V](
// we don't try deleting files, etc twice.
private var stopping = false
- private val writeMetrics = new ShuffleWriteMetrics()
- metrics.shuffleWriteMetrics = Some(writeMetrics)
+ private val writeMetrics = metrics.registerShuffleWriteMetrics()
private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 5c5a5f5a4c..7eb3d96037 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -45,8 +45,7 @@ private[spark] class SortShuffleWriter[K, V, C](
private var mapStatus: MapStatus = null
- private val writeMetrics = new ShuffleWriteMetrics()
- context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)
+ private val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics()
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
@@ -93,8 +92,7 @@ private[spark] class SortShuffleWriter[K, V, C](
if (sorter != null) {
val startTime = System.nanoTime()
sorter.stop()
- context.taskMetrics.shuffleWriteMetrics.foreach(
- _.incWriteTime(System.nanoTime - startTime))
+ writeMetrics.incWriteTime(System.nanoTime - startTime)
sorter = null
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e0a8e88df2..77fd03a6bc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -800,10 +800,8 @@ private[spark] class BlockManager(
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
- Option(TaskContext.get()).foreach { taskContext =>
- val metrics = taskContext.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, putBlockStatus)))
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus)))
}
}
} finally {
@@ -1046,10 +1044,8 @@ private[spark] class BlockManager(
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
- Option(TaskContext.get()).foreach { taskContext =>
- val metrics = taskContext.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 037bec1d9c..c6065df64a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -101,7 +101,7 @@ final class ShuffleBlockFetcherIterator(
/** Current bytes in flight from our requests */
private[this] var bytesInFlight = 0L
- private[this] val shuffleMetrics = context.taskMetrics().createShuffleReadMetricsForDependency()
+ private[this] val shuffleMetrics = context.taskMetrics().registerTempShuffleReadMetrics()
/**
* Whether the iterator is still active. If isZombie is true, the callback interface will no
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index ec711480eb..d98aae8ff0 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -63,7 +63,7 @@ class StorageStatusListener extends SparkListener {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
- val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ val updatedBlocks = metrics.updatedBlockStatuses
if (updatedBlocks.length > 0) {
updateStorageStatus(info.executorId, updatedBlocks)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 2d9b885c68..f1e28b4e1e 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -63,8 +63,8 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val metrics = taskEnd.taskMetrics
- if (metrics != null && metrics.updatedBlocks.isDefined) {
- updateRDDInfo(metrics.updatedBlocks.get)
+ if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
+ updateRDDInfo(metrics.updatedBlockStatuses)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b88221a249..efa22b9993 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -292,21 +292,38 @@ private[spark] object JsonProtocol {
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
- val shuffleReadMetrics =
- taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
- val shuffleWriteMetrics =
- taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
- val inputMetrics =
- taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
- val outputMetrics =
- taskMetrics.outputMetrics.map(outputMetricsToJson).getOrElse(JNothing)
- val updatedBlocks =
- taskMetrics.updatedBlocks.map { blocks =>
- JArray(blocks.toList.map { case (id, status) =>
- ("Block ID" -> id.toString) ~
- ("Status" -> blockStatusToJson(status))
- })
+ val shuffleReadMetrics: JValue =
+ taskMetrics.shuffleReadMetrics.map { rm =>
+ ("Remote Blocks Fetched" -> rm.remoteBlocksFetched) ~
+ ("Local Blocks Fetched" -> rm.localBlocksFetched) ~
+ ("Fetch Wait Time" -> rm.fetchWaitTime) ~
+ ("Remote Bytes Read" -> rm.remoteBytesRead) ~
+ ("Local Bytes Read" -> rm.localBytesRead) ~
+ ("Total Records Read" -> rm.recordsRead)
+ }.getOrElse(JNothing)
+ val shuffleWriteMetrics: JValue =
+ taskMetrics.shuffleWriteMetrics.map { wm =>
+ ("Shuffle Bytes Written" -> wm.shuffleBytesWritten) ~
+ ("Shuffle Write Time" -> wm.shuffleWriteTime) ~
+ ("Shuffle Records Written" -> wm.shuffleRecordsWritten)
+ }.getOrElse(JNothing)
+ val inputMetrics: JValue =
+ taskMetrics.inputMetrics.map { im =>
+ ("Data Read Method" -> im.readMethod.toString) ~
+ ("Bytes Read" -> im.bytesRead) ~
+ ("Records Read" -> im.recordsRead)
+ }.getOrElse(JNothing)
+ val outputMetrics: JValue =
+ taskMetrics.outputMetrics.map { om =>
+ ("Data Write Method" -> om.writeMethod.toString) ~
+ ("Bytes Written" -> om.bytesWritten) ~
+ ("Records Written" -> om.recordsWritten)
}.getOrElse(JNothing)
+ val updatedBlocks =
+ JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
+ ("Block ID" -> id.toString) ~
+ ("Status" -> blockStatusToJson(status))
+ })
("Host Name" -> taskMetrics.hostname) ~
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
@@ -322,34 +339,6 @@ private[spark] object JsonProtocol {
("Updated Blocks" -> updatedBlocks)
}
- def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
- ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
- ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
- ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
- ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
- ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
- ("Total Records Read" -> shuffleReadMetrics.recordsRead)
- }
-
- // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
- def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
- ("Shuffle Bytes Written" -> shuffleWriteMetrics.bytesWritten) ~
- ("Shuffle Write Time" -> shuffleWriteMetrics.writeTime) ~
- ("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten)
- }
-
- def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
- ("Data Read Method" -> inputMetrics.readMethod.toString) ~
- ("Bytes Read" -> inputMetrics.bytesRead) ~
- ("Records Read" -> inputMetrics.recordsRead)
- }
-
- def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
- ("Data Write Method" -> outputMetrics.writeMethod.toString) ~
- ("Bytes Written" -> outputMetrics.bytesWritten) ~
- ("Records Written" -> outputMetrics.recordsWritten)
- }
-
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
val json: JObject = taskEndReason match {
@@ -721,58 +710,54 @@ private[spark] object JsonProtocol {
metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
- metrics.setShuffleReadMetrics(
- Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
- metrics.shuffleWriteMetrics =
- Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
- metrics.setInputMetrics(
- Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
- metrics.outputMetrics =
- Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
- metrics.updatedBlocks =
- Utils.jsonOption(json \ "Updated Blocks").map { value =>
- value.extract[List[JValue]].map { block =>
- val id = BlockId((block \ "Block ID").extract[String])
- val status = blockStatusFromJson(block \ "Status")
- (id, status)
- }
- }
- metrics
- }
- def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
- val metrics = new ShuffleReadMetrics
- metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int])
- metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
- metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
- metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
- metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0))
- metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0))
- metrics
- }
+ // Shuffle read metrics
+ Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
+ val readMetrics = metrics.registerTempShuffleReadMetrics()
+ readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
+ readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
+ readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
+ readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L))
+ readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
+ readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L))
+ metrics.mergeShuffleReadMetrics()
+ }
- def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
- val metrics = new ShuffleWriteMetrics
- metrics.incBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
- metrics.incWriteTime((json \ "Shuffle Write Time").extract[Long])
- metrics.setRecordsWritten((json \ "Shuffle Records Written")
- .extractOpt[Long].getOrElse(0))
- metrics
- }
+ // Shuffle write metrics
+ // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
+ Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
+ val writeMetrics = metrics.registerShuffleWriteMetrics()
+ writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
+ writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
+ .extractOpt[Long].getOrElse(0L))
+ writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
+ }
- def inputMetricsFromJson(json: JValue): InputMetrics = {
- val metrics = new InputMetrics(
- DataReadMethod.withName((json \ "Data Read Method").extract[String]))
- metrics.incBytesRead((json \ "Bytes Read").extract[Long])
- metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0))
- metrics
- }
+ // Output metrics
+ Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
+ val writeMethod = DataWriteMethod.withName((outJson \ "Data Write Method").extract[String])
+ val outputMetrics = metrics.registerOutputMetrics(writeMethod)
+ outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
+ outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
+ }
+
+ // Input metrics
+ Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
+ val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
+ val inputMetrics = metrics.registerInputMetrics(readMethod)
+ inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
+ inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+ }
+
+ // Updated blocks
+ Utils.jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
+ metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson =>
+ val id = BlockId((blockJson \ "Block ID").extract[String])
+ val status = blockStatusFromJson(blockJson \ "Status")
+ (id, status)
+ })
+ }
- def outputMetricsFromJson(json: JValue): OutputMetrics = {
- val metrics = new OutputMetrics(
- DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
- metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
- metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(0))
metrics
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 4c7416e00b..df9e0502e7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -644,6 +644,8 @@ private[spark] class ExternalSorter[K, V, C](
blockId: BlockId,
outputFile: File): Array[Long] = {
+ val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics()
+
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
@@ -652,8 +654,8 @@ private[spark] class ExternalSorter[K, V, C](
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
- val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
- context.taskMetrics.shuffleWriteMetrics.get)
+ val writer = blockManager.getDiskWriter(
+ blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
@@ -666,8 +668,8 @@ private[spark] class ExternalSorter[K, V, C](
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
- val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
- context.taskMetrics.shuffleWriteMetrics.get)
+ val writer = blockManager.getDiskWriter(
+ blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 3865c201bf..48a0282b30 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -88,7 +88,7 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
try {
TaskContext.setTaskContext(context)
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
- assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
+ assert(context.taskMetrics.updatedBlockStatuses.size === 2)
} finally {
TaskContext.unset()
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 8275fd8776..e5ec2aa1be 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.SparkFunSuite
class TaskMetricsSuite extends SparkFunSuite {
test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") {
val taskMetrics = new TaskMetrics()
- taskMetrics.updateShuffleReadMetrics()
+ taskMetrics.mergeShuffleReadMetrics()
assert(taskMetrics.shuffleReadMetrics.isEmpty)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6e6cf6385f..e1b2c9633e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -855,7 +855,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
} finally {
TaskContext.unset()
}
- context.taskMetrics.updatedBlocks.getOrElse(Seq.empty)
+ context.taskMetrics.updatedBlockStatuses
}
// 1 updated block (i.e. list1)
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 355d80d068..9de434166b 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -85,8 +85,8 @@ class StorageStatusListenerSuite extends SparkFunSuite {
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
- taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
- taskMetrics2.updatedBlocks = Some(Seq(block3))
+ taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
+ taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
// Task end with new blocks
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
@@ -108,8 +108,8 @@ class StorageStatusListenerSuite extends SparkFunSuite {
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L))
val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L))
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L))
- taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
- taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
+ taskMetrics1.setUpdatedBlockStatuses(Seq(droppedBlock1, droppedBlock3))
+ taskMetrics2.setUpdatedBlockStatuses(Seq(droppedBlock2, droppedBlock3))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
@@ -133,8 +133,8 @@ class StorageStatusListenerSuite extends SparkFunSuite {
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
- taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
- taskMetrics2.updatedBlocks = Some(Seq(block3))
+ taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
+ taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index ee2d56a679..607617cbe9 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -184,12 +184,12 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
- val shuffleReadMetrics = new ShuffleReadMetrics()
+ val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.incRemoteBytesRead(1000)
- taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
+ taskMetrics.mergeShuffleReadMetrics()
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0)
@@ -270,22 +270,19 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
def makeTaskMetrics(base: Int): TaskMetrics = {
val taskMetrics = new TaskMetrics()
- val shuffleReadMetrics = new ShuffleReadMetrics()
- val shuffleWriteMetrics = new ShuffleWriteMetrics()
- taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
- taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
+ taskMetrics.setExecutorRunTime(base + 4)
+ taskMetrics.incDiskBytesSpilled(base + 5)
+ taskMetrics.incMemoryBytesSpilled(base + 6)
+ val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
+ taskMetrics.mergeShuffleReadMetrics()
+ val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
shuffleWriteMetrics.incBytesWritten(base + 3)
- taskMetrics.setExecutorRunTime(base + 4)
- taskMetrics.incDiskBytesSpilled(base + 5)
- taskMetrics.incMemoryBytesSpilled(base + 6)
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
- taskMetrics.setInputMetrics(Some(inputMetrics))
+ val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.incBytesRead(base + 7)
- val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
- taskMetrics.outputMetrics = Some(outputMetrics)
+ val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(base + 8)
taskMetrics
}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 5ac922c217..d1dbf7c155 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -127,7 +127,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
// Task end with a few new persisted blocks, some from the same RDD
val metrics1 = new TaskMetrics
- metrics1.updatedBlocks = Some(Seq(
+ metrics1.setUpdatedBlockStatuses(Seq(
(RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)),
(RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)),
(RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L))
@@ -146,7 +146,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
// Task end with a few dropped blocks
val metrics2 = new TaskMetrics
- metrics2.updatedBlocks = Some(Seq(
+ metrics2.setUpdatedBlockStatuses(Seq(
(RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)),
(RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)),
(RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist
@@ -173,8 +173,8 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
val taskMetrics1 = new TaskMetrics
val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L))
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L))
- taskMetrics0.updatedBlocks = Some(Seq(block0))
- taskMetrics1.updatedBlocks = Some(Seq(block1))
+ taskMetrics0.setUpdatedBlockStatuses(Seq(block0))
+ taskMetrics1.setUpdatedBlockStatuses(Seq(block1))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 9dd400fc1c..e5ca2de4ad 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -557,7 +557,7 @@ class JsonProtocolSuite extends SparkFunSuite {
metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
assertOptionEquals(
metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
- assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
+ assertBlocksEquals(metrics1.updatedBlockStatuses, metrics2.updatedBlockStatuses)
}
private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {
@@ -773,34 +773,31 @@ class JsonProtocolSuite extends SparkFunSuite {
t.incMemoryBytesSpilled(a + c)
if (hasHadoopInput) {
- val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.incBytesRead(d + e + f)
inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
- t.setInputMetrics(Some(inputMetrics))
} else {
- val sr = new ShuffleReadMetrics
+ val sr = t.registerTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
sr.incLocalBytesRead(a + f)
- t.setShuffleReadMetrics(Some(sr))
+ t.mergeShuffleReadMetrics()
}
if (hasOutput) {
- val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+ val outputMetrics = t.registerOutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(a + b + c)
outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
- t.outputMetrics = Some(outputMetrics)
} else {
- val sw = new ShuffleWriteMetrics
+ val sw = t.registerShuffleWriteMetrics()
sw.incBytesWritten(a + b + c)
sw.incWriteTime(b + c + d)
sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
- t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks
- t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
+ t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i =>
(RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i))
}.toSeq)
t
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index d45d2db62f..8222b84d33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -126,8 +126,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf(isDriverSide = false)
- val inputMetrics = context.taskMetrics
- .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 9f09eb4429..7438e11ef7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -127,7 +127,6 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
assert(sorter.numSpills > 0)
// Merging spilled files should not throw assertion error
- taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
} {
// Clean up