From a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 15 Jan 2015 18:48:39 -0800 Subject: [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method. A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other. Author: Kostas Sakellis Closes #3120 from ksakellis/kostas-spark-4092 and squashes the following commits: 54e6658 [Kostas Sakellis] Drops metrics if conflicting read methods exist f0e0cc5 [Kostas Sakellis] Add bytesReadCallback to InputMetrics a2a36d4 [Kostas Sakellis] CR feedback 5a0c770 [Kostas Sakellis] [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds --- .../main/scala/org/apache/spark/CacheManager.scala | 6 +- .../scala/org/apache/spark/executor/Executor.scala | 1 + .../org/apache/spark/executor/TaskMetrics.scala | 75 +++++++- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 39 ++--- .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 40 ++--- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocol.scala | 6 +- .../spark/metrics/InputOutputMetricsSuite.scala | 195 +++++++++++++++++---- .../spark/ui/jobs/JobProgressListenerSuite.scala | 4 +- .../org/apache/spark/util/JsonProtocolSuite.scala | 4 +- 10 files changed, 270 insertions(+), 102 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 80da62c44e..a0c0372b7f 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(blockResult) => // Partition is already materialized, so just return its values - context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + val inputMetrics = blockResult.inputMetrics + val existingMetrics = context.taskMetrics + .getInputMetricsForReadMethod(inputMetrics.readMethod) + existingMetrics.addBytesRead(inputMetrics.bytesRead) + new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b75c77b5b4..6660b98eb8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -379,6 +379,7 @@ private[spark] class Executor( if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics + metrics.updateInputMetrics() metrics.jvmGCTime = curGCTime - taskRunner.startGCTime if (isLocal) { // JobProgressListener will hold an reference of it during diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 51b5328cb4..7eb10f95e0 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,11 @@ package org.apache.spark.executor +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.executor.DataReadMethod +import org.apache.spark.executor.DataReadMethod.DataReadMethod + import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi @@ -80,7 +85,17 @@ class TaskMetrics extends Serializable { * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read * are stored here. */ - var inputMetrics: Option[InputMetrics] = None + private var _inputMetrics: Option[InputMetrics] = None + + def inputMetrics = _inputMetrics + + /** + * This should only be used when recreating TaskMetrics, not when updating input metrics in + * executors + */ + private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) { + _inputMetrics = inputMetrics + } /** * If this task writes data externally (e.g. to a distributed filesystem), metrics on how much @@ -133,6 +148,30 @@ class TaskMetrics extends Serializable { readMetrics } + /** + * Returns the input metrics object that the task should use. Currently, if + * there exists an input metric with the same readMethod, we return that one + * so the caller can accumulate bytes read. If the readMethod is different + * than previously seen by this task, we return a new InputMetric but don't + * record it. + * + * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed, + * we can store all the different inputMetrics (one per readMethod). + */ + private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): + InputMetrics =synchronized { + _inputMetrics match { + case None => + val metrics = new InputMetrics(readMethod) + _inputMetrics = Some(metrics) + metrics + case Some(metrics @ InputMetrics(method)) if method == readMethod => + metrics + case Some(InputMetrics(method)) => + new InputMetrics(readMethod) + } + } + /** * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. */ @@ -146,6 +185,10 @@ class TaskMetrics extends Serializable { } _shuffleReadMetrics = Some(merged) } + + private[spark] def updateInputMetrics() = synchronized { + inputMetrics.foreach(_.updateBytesRead()) + } } private[spark] object TaskMetrics { @@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable { */ @DeveloperApi case class InputMetrics(readMethod: DataReadMethod.Value) { + + private val _bytesRead: AtomicLong = new AtomicLong() + /** * Total bytes read. */ - var bytesRead: Long = 0L + def bytesRead: Long = _bytesRead.get() + @volatile @transient var bytesReadCallback: Option[() => Long] = None + + /** + * Adds additional bytes read for this read method. + */ + def addBytesRead(bytes: Long) = { + _bytesRead.addAndGet(bytes) + } + + /** + * Invoke the bytesReadCallback and mutate bytesRead. + */ + def updateBytesRead() { + bytesReadCallback.foreach { c => + _bytesRead.set(c()) + } + } + + /** + * Register a function that can be called to get up-to-date information on how many bytes the task + * has read from an input source. + */ + def setBytesReadCallback(f: Option[() => Long]) { + bytesReadCallback = f + } } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 37e0c13029..3b99d3a6ca 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,18 +213,19 @@ class HadoopRDD[K, V]( logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics + .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes - val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( - split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf) - } else { - None - } - if (bytesReadCallback.isDefined) { - context.taskMetrics.inputMetrics = Some(inputMetrics) - } + val bytesReadCallback = inputMetrics.bytesReadCallback.orElse( + split.inputSplit.value match { + case split: FileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf) + case _ => None + } + ) + inputMetrics.setBytesReadCallback(bytesReadCallback) var reader: RecordReader[K, V] = null val inputFormat = getInputFormat(jobConf) @@ -237,8 +238,6 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() - var recordsSinceMetricsUpdate = 0 - override def getNext() = { try { finished = !reader.next(key, value) @@ -246,16 +245,6 @@ class HadoopRDD[K, V]( case eof: EOFException => finished = true } - - // Update bytes read metric every few records - if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES - && bytesReadCallback.isDefined) { - recordsSinceMetricsUpdate = 0 - val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() - } else { - recordsSinceMetricsUpdate += 1 - } (key, value) } @@ -263,14 +252,12 @@ class HadoopRDD[K, V]( try { reader.close() if (bytesReadCallback.isDefined) { - val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.updateBytesRead() } else if (split.inputSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.bytesRead = split.inputSplit.value.getLength - context.taskMetrics.inputMetrics = Some(inputMetrics) + inputMetrics.addBytesRead(split.inputSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e55d03d391..890ec677c2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -109,18 +109,19 @@ class NewHadoopRDD[K, V]( logInfo("Input split: " + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics + .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes - val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( - split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf) - } else { - None - } - if (bytesReadCallback.isDefined) { - context.taskMetrics.inputMetrics = Some(inputMetrics) - } + val bytesReadCallback = inputMetrics.bytesReadCallback.orElse( + split.serializableHadoopSplit.value match { + case split: FileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf) + case _ => None + } + ) + inputMetrics.setBytesReadCallback(bytesReadCallback) val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) @@ -153,34 +154,19 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - - // Update bytes read metric every few records - if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES - && bytesReadCallback.isDefined) { - recordsSinceMetricsUpdate = 0 - val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() - } else { - recordsSinceMetricsUpdate += 1 - } - (reader.getCurrentKey, reader.getCurrentValue) } private def close() { try { reader.close() - - // Update metrics with final amount if (bytesReadCallback.isDefined) { - val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength - context.taskMetrics.inputMetrics = Some(inputMetrics) + inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1427305d91..8bc5a1cd18 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -53,7 +53,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.bytesRead = bytes + inputMetrics.addBytesRead(bytes) } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a025011006..ee3756c226 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -637,8 +637,8 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) - metrics.inputMetrics = - Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson) + metrics.setInputMetrics( + Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)) metrics.outputMetrics = Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson) metrics.updatedBlocks = @@ -671,7 +671,7 @@ private[spark] object JsonProtocol { def inputMetricsFromJson(json: JValue): InputMetrics = { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.bytesRead = (json \ "Bytes Read").extract[Long] + metrics.addBytesRead((json \ "Bytes Read").extract[Long]) metrics } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index f8bcde12a3..10a39990f8 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -17,66 +17,185 @@ package org.apache.spark.metrics -import java.io.{FileWriter, PrintWriter, File} +import java.io.{File, FileWriter, PrintWriter} -import org.apache.spark.SharedSparkContext -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} +import scala.collection.mutable.ArrayBuffer import org.scalatest.FunSuite -import org.scalatest.Matchers import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} -import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SharedSparkContext +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.util.Utils + +class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { -class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers { - test("input metrics when reading text file with single split") { - val file = new File(getClass.getSimpleName + ".txt") - val pw = new PrintWriter(new FileWriter(file)) - pw.println("some stuff") - pw.println("some other stuff") - pw.println("yet more stuff") - pw.println("too much stuff") + @transient var tmpDir: File = _ + @transient var tmpFile: File = _ + @transient var tmpFilePath: String = _ + + override def beforeAll() { + super.beforeAll() + + tmpDir = Utils.createTempDir() + val testTempDir = new File(tmpDir, "test") + testTempDir.mkdir() + + tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt") + val pw = new PrintWriter(new FileWriter(tmpFile)) + for (x <- 1 to 1000000) { + pw.println("s") + } pw.close() - file.deleteOnExit() - val taskBytesRead = new ArrayBuffer[Long]() - sc.addSparkListener(new SparkListener() { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead - } - }) - sc.textFile("file://" + file.getAbsolutePath, 2).count() + // Path to tmpFile + tmpFilePath = "file://" + tmpFile.getAbsolutePath + } - // Wait for task end events to come in - sc.listenerBus.waitUntilEmpty(500) - assert(taskBytesRead.length == 2) - assert(taskBytesRead.sum >= file.length()) + override def afterAll() { + super.afterAll() + Utils.deleteRecursively(tmpDir) } - test("input metrics when reading text file with multiple splits") { - val file = new File(getClass.getSimpleName + ".txt") - val pw = new PrintWriter(new FileWriter(file)) - for (i <- 0 until 10000) { - pw.println("some stuff") + test("input metrics for old hadoop with coalesce") { + val bytesRead = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 4).count() + } + val bytesRead2 = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 4).coalesce(2).count() + } + assert(bytesRead != 0) + assert(bytesRead == bytesRead2) + assert(bytesRead2 >= tmpFile.length()) + } + + test("input metrics with cache and coalesce") { + // prime the cache manager + val rdd = sc.textFile(tmpFilePath, 4).cache() + rdd.collect() + + val bytesRead = runAndReturnBytesRead { + rdd.count() + } + val bytesRead2 = runAndReturnBytesRead { + rdd.coalesce(4).count() } - pw.close() - file.deleteOnExit() + // for count and coelesce, the same bytes should be read. + assert(bytesRead != 0) + assert(bytesRead2 == bytesRead) + } + + /** + * This checks the situation where we have interleaved reads from + * different sources. Currently, we only accumulate fron the first + * read method we find in the task. This test uses cartesian to create + * the interleaved reads. + * + * Once https://issues.apache.org/jira/browse/SPARK-5225 is fixed + * this test should break. + */ + test("input metrics with mixed read method") { + // prime the cache manager + val numPartitions = 2 + val rdd = sc.parallelize(1 to 100, numPartitions).cache() + rdd.collect() + + val rdd2 = sc.textFile(tmpFilePath, numPartitions) + + val bytesRead = runAndReturnBytesRead { + rdd.count() + } + val bytesRead2 = runAndReturnBytesRead { + rdd2.count() + } + + val cartRead = runAndReturnBytesRead { + rdd.cartesian(rdd2).count() + } + + assert(cartRead != 0) + assert(bytesRead != 0) + // We read from the first rdd of the cartesian once per partition. + assert(cartRead == bytesRead * numPartitions) + } + + test("input metrics for new Hadoop API with coalesce") { + val bytesRead = runAndReturnBytesRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).count() + } + val bytesRead2 = runAndReturnBytesRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).coalesce(5).count() + } + assert(bytesRead != 0) + assert(bytesRead2 == bytesRead) + assert(bytesRead >= tmpFile.length()) + } + + test("input metrics when reading text file") { + val bytesRead = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 2).count() + } + assert(bytesRead >= tmpFile.length()) + } + + test("input metrics with interleaved reads") { + val numPartitions = 2 + val cartVector = 0 to 9 + val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt") + val cartFilePath = "file://" + cartFile.getAbsolutePath + + // write files to disk so we can read them later. + sc.parallelize(cartVector).saveAsTextFile(cartFilePath) + val aRdd = sc.textFile(cartFilePath, numPartitions) + + val tmpRdd = sc.textFile(tmpFilePath, numPartitions) + + val firstSize= runAndReturnBytesRead { + aRdd.count() + } + val secondSize = runAndReturnBytesRead { + tmpRdd.count() + } + + val cartesianBytes = runAndReturnBytesRead { + aRdd.cartesian(tmpRdd).count() + } + + // Computing the amount of bytes read for a cartesian operation is a little involved. + // Cartesian interleaves reads between two partitions eg. p1 and p2. + // Here are the steps: + // 1) First it creates an iterator for p1 + // 2) Creates an iterator for p2 + // 3) Reads the first element of p1 and then all the elements of p2 + // 4) proceeds to the next element of p1 + // 5) Creates a new iterator for p2 + // 6) rinse and repeat. + // As a result we read from the second partition n times where n is the number of keys in + // p1. Thus the math below for the test. + assert(cartesianBytes != 0) + assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize)) + } + + private def runAndReturnBytesRead(job : => Unit): Long = { val taskBytesRead = new ArrayBuffer[Long]() sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead } }) - sc.textFile("file://" + file.getAbsolutePath, 2).count() - // Wait for task end events to come in + job + sc.listenerBus.waitUntilEmpty(500) - assert(taskBytesRead.length == 2) - assert(taskBytesRead.sum >= file.length()) + taskBytesRead.sum } test("output metrics when writing text file") { diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 12af60caf7..f865d8ca04 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -231,8 +231,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskMetrics.diskBytesSpilled = base + 5 taskMetrics.memoryBytesSpilled = base + 6 val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - taskMetrics.inputMetrics = Some(inputMetrics) - inputMetrics.bytesRead = base + 7 + taskMetrics.setInputMetrics(Some(inputMetrics)) + inputMetrics.addBytesRead(base + 7) val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) taskMetrics.outputMetrics = Some(outputMetrics) outputMetrics.bytesWritten = base + 8 diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 5ba94ff67d..71dfed1289 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -630,8 +630,8 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - inputMetrics.bytesRead = d + e + f - t.inputMetrics = Some(inputMetrics) + inputMetrics.addBytesRead(d + e + f) + t.setInputMetrics(Some(inputMetrics)) } else { val sr = new ShuffleReadMetrics sr.remoteBytesRead = b + d -- cgit v1.2.3