aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala103
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala28
17 files changed, 149 insertions, 103 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 09eb9605fb..3b684bbece 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
- c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
- c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
+ c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
+ c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
@@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
- c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
- c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
+ c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
+ c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index bad40e6529..4ac666c54f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -125,8 +125,8 @@ private[spark] class PythonRDD(
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
- context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
- context.taskMetrics.diskBytesSpilled += diskBytesSpilled
+ context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
+ context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6660b98eb8..42566d1a14 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -203,10 +203,10 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
- m.executorDeserializeTime = taskStart - deserializeStartTime
- m.executorRunTime = taskFinish - taskStart
- m.jvmGCTime = gcTime - startGCTime
- m.resultSerializationTime = afterSerialization - beforeSerialization
+ m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
+ m.setExecutorRunTime(taskFinish - taskStart)
+ m.setJvmGCTime(gcTime - startGCTime)
+ m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
val accumUpdates = Accumulators.values
@@ -257,8 +257,8 @@ private[spark] class Executor(
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
- m.executorRunTime = serviceTime
- m.jvmGCTime = gcTime - startGCTime
+ m.setExecutorRunTime(serviceTime)
+ m.setJvmGCTime(gcTime - startGCTime)
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
@@ -376,11 +376,12 @@ private[spark] class Executor(
val curGCTime = gcTime
for (taskRunner <- runningTasks.values()) {
- if (!taskRunner.attemptedTask.isEmpty) {
+ if (taskRunner.attemptedTask.nonEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
- metrics.updateShuffleReadMetrics
+ metrics.updateShuffleReadMetrics()
metrics.updateInputMetrics()
- metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
+ metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
+
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 7eb10f95e0..ddb5903bf6 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,42 +44,62 @@ class TaskMetrics extends Serializable {
/**
* Host's name the task runs on
*/
- var hostname: String = _
-
+ private var _hostname: String = _
+ def hostname = _hostname
+ private[spark] def setHostname(value: String) = _hostname = value
+
/**
* Time taken on the executor to deserialize this task
*/
- var executorDeserializeTime: Long = _
-
+ private var _executorDeserializeTime: Long = _
+ def executorDeserializeTime = _executorDeserializeTime
+ private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
+
+
/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
- var executorRunTime: Long = _
-
+ private var _executorRunTime: Long = _
+ def executorRunTime = _executorRunTime
+ private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
+
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
- var resultSize: Long = _
+ private var _resultSize: Long = _
+ def resultSize = _resultSize
+ private[spark] def setResultSize(value: Long) = _resultSize = value
+
/**
* Amount of time the JVM spent in garbage collection while executing this task
*/
- var jvmGCTime: Long = _
+ private var _jvmGCTime: Long = _
+ def jvmGCTime = _jvmGCTime
+ private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
/**
* Amount of time spent serializing the task result
*/
- var resultSerializationTime: Long = _
+ private var _resultSerializationTime: Long = _
+ def resultSerializationTime = _resultSerializationTime
+ private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value
/**
* The number of in-memory bytes spilled by this task
*/
- var memoryBytesSpilled: Long = _
+ private var _memoryBytesSpilled: Long = _
+ def memoryBytesSpilled = _memoryBytesSpilled
+ private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
+ private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value
/**
* The number of on-disk bytes spilled by this task
*/
- var diskBytesSpilled: Long = _
+ private var _diskBytesSpilled: Long = _
+ def diskBytesSpilled = _diskBytesSpilled
+ def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
+ def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value
/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
@@ -178,10 +198,10 @@ class TaskMetrics extends Serializable {
private[spark] def updateShuffleReadMetrics() = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
- merged.fetchWaitTime += depMetrics.fetchWaitTime
- merged.localBlocksFetched += depMetrics.localBlocksFetched
- merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
- merged.remoteBytesRead += depMetrics.remoteBytesRead
+ merged.incFetchWaitTime(depMetrics.fetchWaitTime)
+ merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
+ merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
+ merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
}
_shuffleReadMetrics = Some(merged)
}
@@ -265,7 +285,9 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
- var bytesWritten: Long = 0L
+ private var _bytesWritten: Long = _
+ def bytesWritten = _bytesWritten
+ private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
}
/**
@@ -275,31 +297,44 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
- * Number of blocks fetched in this shuffle by this task (remote or local)
- */
- def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
-
- /**
* Number of remote blocks fetched in this shuffle by this task
*/
- var remoteBlocksFetched: Int = _
-
+ private var _remoteBlocksFetched: Int = _
+ def remoteBlocksFetched = _remoteBlocksFetched
+ private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
+ private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+
/**
* Number of local blocks fetched in this shuffle by this task
*/
- var localBlocksFetched: Int = _
+ private var _localBlocksFetched: Int = _
+ def localBlocksFetched = _localBlocksFetched
+ private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
+ private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+
/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.
*/
- var fetchWaitTime: Long = _
-
+ private var _fetchWaitTime: Long = _
+ def fetchWaitTime = _fetchWaitTime
+ private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
+ private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+
/**
* Total number of remote bytes read from the shuffle by this task
*/
- var remoteBytesRead: Long = _
+ private var _remoteBytesRead: Long = _
+ def remoteBytesRead = _remoteBytesRead
+ private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
+ private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+
+ /**
+ * Number of blocks fetched in this shuffle by this task (remote or local)
+ */
+ def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
}
/**
@@ -311,10 +346,18 @@ class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
- @volatile var shuffleBytesWritten: Long = _
-
+ @volatile private var _shuffleBytesWritten: Long = _
+ def shuffleBytesWritten = _shuffleBytesWritten
+ private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
+ private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
+
/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
- @volatile var shuffleWriteTime: Long = _
+ @volatile private var _shuffleWriteTime: Long = _
+ def shuffleWriteTime= _shuffleWriteTime
+ private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
+ private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
+
+
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 70edf191d9..07398a6fa6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -159,8 +159,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
- context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled
- context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled
+ context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
+ context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3b99d3a6ca..056aef0bc2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -245,6 +245,7 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
+
(key, value)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 890ec677c2..7b0e3c87cc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -154,6 +154,7 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
+
(reader.getCurrentKey, reader.getCurrentValue)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index e43e506665..0f37d830ef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1007,7 +1007,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+ bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
1
} : Int
@@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close()
}
writer.commit()
- bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+ bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
}
self.context.runJob(self, writeToFile)
@@ -1102,7 +1102,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
- bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+ bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 2367f7e2cf..847a4912ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -55,7 +55,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
- context.taskMetrics.hostname = Utils.localHostName()
+ context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 4896ec845b..774f3d8cdb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
(deserializedResult, size)
}
- result.metrics.resultSize = size
+ result.metrics.setResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index de72148ccc..41bafabde0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -59,8 +59,8 @@ private[spark] class HashShuffleReader[K, C](
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
- context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
- context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
+ context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+ context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator
case None =>
aggregatedIter
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 9c469370ff..3198d766fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -160,14 +160,14 @@ private[spark] class DiskBlockObjectWriter(
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
- writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
+ writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
}
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
- writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
+ writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
if (initialized) {
objOut.flush()
@@ -212,14 +212,14 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
- writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
+ writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
reportedPosition = pos
}
private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
- writeMetrics.shuffleWriteTime += (System.nanoTime() - start)
+ writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
}
// For testing
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 2499c11a65..ab9ee4f009 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -156,8 +156,8 @@ final class ShuffleBlockFetcherIterator(
// This needs to be released after use.
buf.retain()
results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
- shuffleMetrics.remoteBytesRead += buf.size
- shuffleMetrics.remoteBlocksFetched += 1
+ shuffleMetrics.incRemoteBytesRead(buf.size)
+ shuffleMetrics.incRemoteBlocksFetched(1)
}
logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
@@ -233,7 +233,7 @@ final class ShuffleBlockFetcherIterator(
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
- shuffleMetrics.localBlocksFetched += 1
+ shuffleMetrics.incLocalBlocksFetched(1)
buf.retain()
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
@@ -277,7 +277,7 @@ final class ShuffleBlockFetcherIterator(
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
- shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
+ shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 76709a230f..f896b5072e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -632,14 +632,14 @@ private[spark] object JsonProtocol {
return TaskMetrics.empty
}
val metrics = new TaskMetrics
- metrics.hostname = (json \ "Host Name").extract[String]
- metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
- metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
- metrics.resultSize = (json \ "Result Size").extract[Long]
- metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
- metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
- metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
- metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
+ metrics.setHostname((json \ "Host Name").extract[String])
+ metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
+ metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
+ metrics.setResultSize((json \ "Result Size").extract[Long])
+ metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
+ metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
+ metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
+ metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
metrics.setShuffleReadMetrics(
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
@@ -661,17 +661,17 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
- metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
- metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
- metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
- metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
+ metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int])
+ metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
+ metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
+ metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
metrics
}
def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
val metrics = new ShuffleWriteMetrics
- metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
- metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
+ metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
+ metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
metrics
}
@@ -685,7 +685,7 @@ private[spark] object JsonProtocol {
def outputMetricsFromJson(json: JValue): OutputMetrics = {
val metrics = new OutputMetrics(
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
- metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
+ metrics.setBytesWritten((json \ "Bytes Written").extract[Long])
metrics
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 15bda1c9cc..6ba03841f7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -757,12 +757,12 @@ private[spark] class ExternalSorter[K, V, C](
}
}
- context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
- context.taskMetrics.diskBytesSpilled += diskBytesSpilled
+ context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
+ context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
if (curWriteMetrics != null) {
- m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten
- m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime
+ m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
+ m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index c9417ea1ed..68074ae32a 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -140,7 +140,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
- shuffleReadMetrics.remoteBytesRead = 1000
+ shuffleReadMetrics.incRemoteBytesRead(1000)
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
@@ -226,18 +226,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val shuffleWriteMetrics = new ShuffleWriteMetrics()
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
- shuffleReadMetrics.remoteBytesRead = base + 1
- shuffleReadMetrics.remoteBlocksFetched = base + 2
- shuffleWriteMetrics.shuffleBytesWritten = base + 3
- taskMetrics.executorRunTime = base + 4
- taskMetrics.diskBytesSpilled = base + 5
- taskMetrics.memoryBytesSpilled = base + 6
+ shuffleReadMetrics.incRemoteBytesRead(base + 1)
+ shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
+ shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
+ taskMetrics.setExecutorRunTime(base + 4)
+ taskMetrics.incDiskBytesSpilled(base + 5)
+ taskMetrics.incMemoryBytesSpilled(base + 6)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.setInputMetrics(Some(inputMetrics))
inputMetrics.addBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
- outputMetrics.bytesWritten = base + 8
+ outputMetrics.setBytesWritten(base + 8)
taskMetrics
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index db400b4162..0357fc6ce2 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -641,13 +641,13 @@ class JsonProtocolSuite extends FunSuite {
hasHadoopInput: Boolean,
hasOutput: Boolean) = {
val t = new TaskMetrics
- t.hostname = "localhost"
- t.executorDeserializeTime = a
- t.executorRunTime = b
- t.resultSize = c
- t.jvmGCTime = d
- t.resultSerializationTime = a + b
- t.memoryBytesSpilled = a + c
+ t.setHostname("localhost")
+ t.setExecutorDeserializeTime(a)
+ t.setExecutorRunTime(b)
+ t.setResultSize(c)
+ t.setJvmGCTime(d)
+ t.setResultSerializationTime(a + b)
+ t.incMemoryBytesSpilled(a + c)
if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
@@ -655,20 +655,20 @@ class JsonProtocolSuite extends FunSuite {
t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
- sr.remoteBytesRead = b + d
- sr.localBlocksFetched = e
- sr.fetchWaitTime = a + d
- sr.remoteBlocksFetched = f
+ sr.incRemoteBytesRead(b + d)
+ sr.incLocalBlocksFetched(e)
+ sr.incFetchWaitTime(a + d)
+ sr.incRemoteBlocksFetched(f)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
- outputMetrics.bytesWritten = a + b + c
+ outputMetrics.setBytesWritten(a + b + c)
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
- sw.shuffleBytesWritten = a + b + c
- sw.shuffleWriteTime = b + c + d
+ sw.incShuffleBytesWritten(a + b + c)
+ sw.incShuffleWriteTime(b + c + d)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks