From 3453d578ad9933be6881488c8ca3611e5b686af9 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 19 Jan 2015 01:32:22 -0800 Subject: [SPARK-3288] All fields in TaskMetrics should be private and use getters/setters I've updated the fields and all usages of these fields in the Spark code. I've verified that this did not break anything on my local repo. Author: Ilya Ganelin Closes #4020 from ilganeli/SPARK-3288 and squashes the following commits: 39f3810 [Ilya Ganelin] resolved merge issues e446287 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3288 b8c05cb [Ilya Ganelin] Missed making a variable private 6444391 [Ilya Ganelin] Made inc/dec functions private[spark] 1149e78 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3288 26b312b [Ilya Ganelin] Debugging tests 17146c2 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-3288 5525c20 [Ilya Ganelin] Completed refactoring to make vars in TaskMetrics class private c64da4f [Ilya Ganelin] Partially updated task metrics to make some vars private --- .../main/scala/org/apache/spark/Aggregator.scala | 8 +- .../org/apache/spark/api/python/PythonRDD.scala | 4 +- .../scala/org/apache/spark/executor/Executor.scala | 19 ++-- .../org/apache/spark/executor/TaskMetrics.scala | 103 +++++++++++++++------ .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 4 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 1 + .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 1 + .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 +- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- .../apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../spark/shuffle/hash/HashShuffleReader.scala | 4 +- .../apache/spark/storage/BlockObjectWriter.scala | 8 +- .../storage/ShuffleBlockFetcherIterator.scala | 8 +- .../scala/org/apache/spark/util/JsonProtocol.scala | 30 +++--- .../spark/util/collection/ExternalSorter.scala | 8 +- .../spark/ui/jobs/JobProgressListenerSuite.scala | 16 ++-- .../org/apache/spark/util/JsonProtocolSuite.scala | 28 +++--- 17 files changed, 149 insertions(+), 103 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 09eb9605fb..3b684bbece 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] ( // Update task metrics if context is not null // TODO: Make context non optional in a future release Option(context).foreach { c => - c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled - c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled + c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled) + c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled) } combiners.iterator } @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] ( // Update task metrics if context is not null // TODO: Make context non-optional in a future release Option(context).foreach { c => - c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled - c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled + c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled) + c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled) } combiners.iterator } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index bad40e6529..4ac666c54f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -125,8 +125,8 @@ private[spark] class PythonRDD( init, finish)) val memoryBytesSpilled = stream.readLong() val diskBytesSpilled = stream.readLong() - context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) read() case SpecialLengths.PYTHON_EXCEPTION_THROWN => // Signals that an exception has been thrown in python diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6660b98eb8..42566d1a14 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -203,10 +203,10 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.executorDeserializeTime = taskStart - deserializeStartTime - m.executorRunTime = taskFinish - taskStart - m.jvmGCTime = gcTime - startGCTime - m.resultSerializationTime = afterSerialization - beforeSerialization + m.setExecutorDeserializeTime(taskStart - deserializeStartTime) + m.setExecutorRunTime(taskFinish - taskStart) + m.setJvmGCTime(gcTime - startGCTime) + m.setResultSerializationTime(afterSerialization - beforeSerialization) } val accumUpdates = Accumulators.values @@ -257,8 +257,8 @@ private[spark] class Executor( val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { - m.executorRunTime = serviceTime - m.jvmGCTime = gcTime - startGCTime + m.setExecutorRunTime(serviceTime) + m.setJvmGCTime(gcTime - startGCTime) } val reason = new ExceptionFailure(t, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) @@ -376,11 +376,12 @@ private[spark] class Executor( val curGCTime = gcTime for (taskRunner <- runningTasks.values()) { - if (!taskRunner.attemptedTask.isEmpty) { + if (taskRunner.attemptedTask.nonEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => - metrics.updateShuffleReadMetrics + metrics.updateShuffleReadMetrics() metrics.updateInputMetrics() - metrics.jvmGCTime = curGCTime - taskRunner.startGCTime + metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) + if (isLocal) { // JobProgressListener will hold an reference of it during // onExecutorMetricsUpdate(), then JobProgressListener can not see diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 7eb10f95e0..ddb5903bf6 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -44,42 +44,62 @@ class TaskMetrics extends Serializable { /** * Host's name the task runs on */ - var hostname: String = _ - + private var _hostname: String = _ + def hostname = _hostname + private[spark] def setHostname(value: String) = _hostname = value + /** * Time taken on the executor to deserialize this task */ - var executorDeserializeTime: Long = _ - + private var _executorDeserializeTime: Long = _ + def executorDeserializeTime = _executorDeserializeTime + private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value + + /** * Time the executor spends actually running the task (including fetching shuffle data) */ - var executorRunTime: Long = _ - + private var _executorRunTime: Long = _ + def executorRunTime = _executorRunTime + private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value + /** * The number of bytes this task transmitted back to the driver as the TaskResult */ - var resultSize: Long = _ + private var _resultSize: Long = _ + def resultSize = _resultSize + private[spark] def setResultSize(value: Long) = _resultSize = value + /** * Amount of time the JVM spent in garbage collection while executing this task */ - var jvmGCTime: Long = _ + private var _jvmGCTime: Long = _ + def jvmGCTime = _jvmGCTime + private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value /** * Amount of time spent serializing the task result */ - var resultSerializationTime: Long = _ + private var _resultSerializationTime: Long = _ + def resultSerializationTime = _resultSerializationTime + private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value /** * The number of in-memory bytes spilled by this task */ - var memoryBytesSpilled: Long = _ + private var _memoryBytesSpilled: Long = _ + def memoryBytesSpilled = _memoryBytesSpilled + private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value + private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value /** * The number of on-disk bytes spilled by this task */ - var diskBytesSpilled: Long = _ + private var _diskBytesSpilled: Long = _ + def diskBytesSpilled = _diskBytesSpilled + def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value + def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value /** * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read @@ -178,10 +198,10 @@ class TaskMetrics extends Serializable { private[spark] def updateShuffleReadMetrics() = synchronized { val merged = new ShuffleReadMetrics() for (depMetrics <- depsShuffleReadMetrics) { - merged.fetchWaitTime += depMetrics.fetchWaitTime - merged.localBlocksFetched += depMetrics.localBlocksFetched - merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched - merged.remoteBytesRead += depMetrics.remoteBytesRead + merged.incFetchWaitTime(depMetrics.fetchWaitTime) + merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) + merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) + merged.incRemoteBytesRead(depMetrics.remoteBytesRead) } _shuffleReadMetrics = Some(merged) } @@ -265,7 +285,9 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { /** * Total bytes written */ - var bytesWritten: Long = 0L + private var _bytesWritten: Long = _ + def bytesWritten = _bytesWritten + private[spark] def setBytesWritten(value : Long) = _bytesWritten = value } /** @@ -274,32 +296,45 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { */ @DeveloperApi class ShuffleReadMetrics extends Serializable { - /** - * Number of blocks fetched in this shuffle by this task (remote or local) - */ - def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched - /** * Number of remote blocks fetched in this shuffle by this task */ - var remoteBlocksFetched: Int = _ - + private var _remoteBlocksFetched: Int = _ + def remoteBlocksFetched = _remoteBlocksFetched + private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value + private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + /** * Number of local blocks fetched in this shuffle by this task */ - var localBlocksFetched: Int = _ + private var _localBlocksFetched: Int = _ + def localBlocksFetched = _localBlocksFetched + private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value + private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value + /** * Time the task spent waiting for remote shuffle blocks. This only includes the time * blocking on shuffle input data. For instance if block B is being fetched while the task is * still not finished processing block A, it is not considered to be blocking on block B. */ - var fetchWaitTime: Long = _ - + private var _fetchWaitTime: Long = _ + def fetchWaitTime = _fetchWaitTime + private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value + private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value + /** * Total number of remote bytes read from the shuffle by this task */ - var remoteBytesRead: Long = _ + private var _remoteBytesRead: Long = _ + def remoteBytesRead = _remoteBytesRead + private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value + private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value + + /** + * Number of blocks fetched in this shuffle by this task (remote or local) + */ + def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched } /** @@ -311,10 +346,18 @@ class ShuffleWriteMetrics extends Serializable { /** * Number of bytes written for the shuffle by this task */ - @volatile var shuffleBytesWritten: Long = _ - + @volatile private var _shuffleBytesWritten: Long = _ + def shuffleBytesWritten = _shuffleBytesWritten + private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value + private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value + /** * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds */ - @volatile var shuffleWriteTime: Long = _ + @volatile private var _shuffleWriteTime: Long = _ + def shuffleWriteTime= _shuffleWriteTime + private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value + private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value + + } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 70edf191d9..07398a6fa6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -159,8 +159,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } - context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 3b99d3a6ca..056aef0bc2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -245,6 +245,7 @@ class HadoopRDD[K, V]( case eof: EOFException => finished = true } + (key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 890ec677c2..7b0e3c87cc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -154,6 +154,7 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false + (reader.getCurrentKey, reader.getCurrentValue) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e43e506665..0f37d830ef 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1007,7 +1007,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close(hadoopContext) } committer.commitTask(hadoopContext) - bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } 1 } : Int @@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close() } writer.commit() - bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } } self.context.runJob(self, writeToFile) @@ -1102,7 +1102,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 && bytesWrittenCallback.isDefined) { - bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() } + bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 2367f7e2cf..847a4912ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -55,7 +55,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex context = new TaskContextImpl(stageId = stageId, partitionId = partitionId, taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false) TaskContextHelper.setTaskContext(context) - context.taskMetrics.hostname = Utils.localHostName() + context.taskMetrics.setHostname(Utils.localHostName()) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 4896ec845b..774f3d8cdb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } - result.metrics.resultSize = size + result.metrics.setResultSize(size) scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index de72148ccc..41bafabde0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -59,8 +59,8 @@ private[spark] class HashShuffleReader[K, C]( // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) - context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) sorter.iterator case None => aggregatedIter diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 9c469370ff..3198d766fc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -160,14 +160,14 @@ private[spark] class DiskBlockObjectWriter( } finalPosition = file.length() // In certain compression codecs, more bytes are written after close() is called - writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition) + writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition) } // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. override def revertPartialWritesAndClose() { try { - writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition) + writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) if (initialized) { objOut.flush() @@ -212,14 +212,14 @@ private[spark] class DiskBlockObjectWriter( */ private def updateBytesWritten() { val pos = channel.position() - writeMetrics.shuffleBytesWritten += (pos - reportedPosition) + writeMetrics.incShuffleBytesWritten(pos - reportedPosition) reportedPosition = pos } private def callWithTiming(f: => Unit) = { val start = System.nanoTime() f - writeMetrics.shuffleWriteTime += (System.nanoTime() - start) + writeMetrics.incShuffleWriteTime(System.nanoTime() - start) } // For testing diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 2499c11a65..ab9ee4f009 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -156,8 +156,8 @@ final class ShuffleBlockFetcherIterator( // This needs to be released after use. buf.retain() results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf)) - shuffleMetrics.remoteBytesRead += buf.size - shuffleMetrics.remoteBlocksFetched += 1 + shuffleMetrics.incRemoteBytesRead(buf.size) + shuffleMetrics.incRemoteBlocksFetched(1) } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } @@ -233,7 +233,7 @@ final class ShuffleBlockFetcherIterator( val blockId = iter.next() try { val buf = blockManager.getBlockData(blockId) - shuffleMetrics.localBlocksFetched += 1 + shuffleMetrics.incLocalBlocksFetched(1) buf.retain() results.put(new SuccessFetchResult(blockId, 0, buf)) } catch { @@ -277,7 +277,7 @@ final class ShuffleBlockFetcherIterator( currentResult = results.take() val result = currentResult val stopFetchWait = System.currentTimeMillis() - shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait) + shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { case SuccessFetchResult(_, size, _) => bytesInFlight -= size diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 76709a230f..f896b5072e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -632,14 +632,14 @@ private[spark] object JsonProtocol { return TaskMetrics.empty } val metrics = new TaskMetrics - metrics.hostname = (json \ "Host Name").extract[String] - metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] - metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] - metrics.resultSize = (json \ "Result Size").extract[Long] - metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long] - metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] - metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] - metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] + metrics.setHostname((json \ "Host Name").extract[String]) + metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) + metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) + metrics.setResultSize((json \ "Result Size").extract[Long]) + metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long]) + metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long]) + metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) + metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) metrics.setShuffleReadMetrics( Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = @@ -661,17 +661,17 @@ private[spark] object JsonProtocol { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { val metrics = new ShuffleReadMetrics - metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] - metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] - metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] - metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long] + metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int]) + metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) + metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) + metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) metrics } def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = { val metrics = new ShuffleWriteMetrics - metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long] - metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long] + metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) + metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) metrics } @@ -685,7 +685,7 @@ private[spark] object JsonProtocol { def outputMetricsFromJson(json: JValue): OutputMetrics = { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) - metrics.bytesWritten = (json \ "Bytes Written").extract[Long] + metrics.setBytesWritten((json \ "Bytes Written").extract[Long]) metrics } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 15bda1c9cc..6ba03841f7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -757,12 +757,12 @@ private[spark] class ExternalSorter[K, V, C]( } } - context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => if (curWriteMetrics != null) { - m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten - m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime + m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) + m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index c9417ea1ed..68074ae32a 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -140,7 +140,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead - shuffleReadMetrics.remoteBytesRead = 1000 + shuffleReadMetrics.incRemoteBytesRead(1000) taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 @@ -226,18 +226,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val shuffleWriteMetrics = new ShuffleWriteMetrics() taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) - shuffleReadMetrics.remoteBytesRead = base + 1 - shuffleReadMetrics.remoteBlocksFetched = base + 2 - shuffleWriteMetrics.shuffleBytesWritten = base + 3 - taskMetrics.executorRunTime = base + 4 - taskMetrics.diskBytesSpilled = base + 5 - taskMetrics.memoryBytesSpilled = base + 6 + shuffleReadMetrics.incRemoteBytesRead(base + 1) + shuffleReadMetrics.incRemoteBlocksFetched(base + 2) + shuffleWriteMetrics.incShuffleBytesWritten(base + 3) + taskMetrics.setExecutorRunTime(base + 4) + taskMetrics.incDiskBytesSpilled(base + 5) + taskMetrics.incMemoryBytesSpilled(base + 6) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.setInputMetrics(Some(inputMetrics)) inputMetrics.addBytesRead(base + 7) val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) taskMetrics.outputMetrics = Some(outputMetrics) - outputMetrics.bytesWritten = base + 8 + outputMetrics.setBytesWritten(base + 8) taskMetrics } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index db400b4162..0357fc6ce2 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -641,13 +641,13 @@ class JsonProtocolSuite extends FunSuite { hasHadoopInput: Boolean, hasOutput: Boolean) = { val t = new TaskMetrics - t.hostname = "localhost" - t.executorDeserializeTime = a - t.executorRunTime = b - t.resultSize = c - t.jvmGCTime = d - t.resultSerializationTime = a + b - t.memoryBytesSpilled = a + c + t.setHostname("localhost") + t.setExecutorDeserializeTime(a) + t.setExecutorRunTime(b) + t.setResultSize(c) + t.setJvmGCTime(d) + t.setResultSerializationTime(a + b) + t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) @@ -655,20 +655,20 @@ class JsonProtocolSuite extends FunSuite { t.setInputMetrics(Some(inputMetrics)) } else { val sr = new ShuffleReadMetrics - sr.remoteBytesRead = b + d - sr.localBlocksFetched = e - sr.fetchWaitTime = a + d - sr.remoteBlocksFetched = f + sr.incRemoteBytesRead(b + d) + sr.incLocalBlocksFetched(e) + sr.incFetchWaitTime(a + d) + sr.incRemoteBlocksFetched(f) t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - outputMetrics.bytesWritten = a + b + c + outputMetrics.setBytesWritten(a + b + c) t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics - sw.shuffleBytesWritten = a + b + c - sw.shuffleWriteTime = b + c + d + sw.incShuffleBytesWritten(a + b + c) + sw.incShuffleWriteTime(b + c + d) t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks -- cgit v1.2.3