aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala7
-rw-r--r--project/MimaExcludes.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala2
8 files changed, 84 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f2517401cb..7fde34d897 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1089,7 +1089,8 @@ class DAGScheduler(
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && !updates.isZero) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
- event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
+ event.taskInfo.setAccumulables(
+ acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index eeb7963c9e..59680139e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -17,8 +17,6 @@
package org.apache.spark.scheduler
-import scala.collection.mutable.ListBuffer
-
import org.apache.spark.TaskState
import org.apache.spark.TaskState.TaskState
import org.apache.spark.annotation.DeveloperApi
@@ -54,7 +52,13 @@ class TaskInfo(
* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different IDs to exist in a task.
*/
- val accumulables = ListBuffer[AccumulableInfo]()
+ def accumulables: Seq[AccumulableInfo] = _accumulables
+
+ private[this] var _accumulables: Seq[AccumulableInfo] = Nil
+
+ private[spark] def setAccumulables(newAccumulables: Seq[AccumulableInfo]): Unit = {
+ _accumulables = newAccumulables
+ }
/**
* The time when the task has completed successfully (including the time to remotely fetch
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index f4a04609c4..9ce8542f02 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.{HashMap, LinkedHashMap}
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor._
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
@@ -147,9 +147,8 @@ private[spark] object UIData {
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
- inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
- outputMetrics =
- OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
+ inputMetrics = InputMetricsUIData(m.inputMetrics),
+ outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
@@ -171,9 +170,9 @@ private[spark] object UIData {
speculative = taskInfo.speculative
)
newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
- newTaskInfo.accumulables ++= taskInfo.accumulables.filter {
+ newTaskInfo.setAccumulables(taskInfo.accumulables.filter {
accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
- }
+ })
newTaskInfo.finishTime = taskInfo.finishTime
newTaskInfo.failed = taskInfo.failed
newTaskInfo
@@ -197,8 +196,32 @@ private[spark] object UIData {
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
+ object InputMetricsUIData {
+ def apply(metrics: InputMetrics): InputMetricsUIData = {
+ if (metrics.bytesRead == 0 && metrics.recordsRead == 0) {
+ EMPTY
+ } else {
+ new InputMetricsUIData(
+ bytesRead = metrics.bytesRead,
+ recordsRead = metrics.recordsRead)
+ }
+ }
+ private val EMPTY = InputMetricsUIData(0, 0)
+ }
case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
+ object OutputMetricsUIData {
+ def apply(metrics: OutputMetrics): OutputMetricsUIData = {
+ if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
+ EMPTY
+ } else {
+ new OutputMetricsUIData(
+ bytesWritten = metrics.bytesWritten,
+ recordsWritten = metrics.recordsWritten)
+ }
+ }
+ private val EMPTY = OutputMetricsUIData(0, 0)
+ }
case class ShuffleReadMetricsUIData(
remoteBlocksFetched: Long,
@@ -212,17 +235,30 @@ private[spark] object UIData {
object ShuffleReadMetricsUIData {
def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
- new ShuffleReadMetricsUIData(
- remoteBlocksFetched = metrics.remoteBlocksFetched,
- localBlocksFetched = metrics.localBlocksFetched,
- remoteBytesRead = metrics.remoteBytesRead,
- localBytesRead = metrics.localBytesRead,
- fetchWaitTime = metrics.fetchWaitTime,
- recordsRead = metrics.recordsRead,
- totalBytesRead = metrics.totalBytesRead,
- totalBlocksFetched = metrics.totalBlocksFetched
- )
+ if (
+ metrics.remoteBlocksFetched == 0 &&
+ metrics.localBlocksFetched == 0 &&
+ metrics.remoteBytesRead == 0 &&
+ metrics.localBytesRead == 0 &&
+ metrics.fetchWaitTime == 0 &&
+ metrics.recordsRead == 0 &&
+ metrics.totalBytesRead == 0 &&
+ metrics.totalBlocksFetched == 0) {
+ EMPTY
+ } else {
+ new ShuffleReadMetricsUIData(
+ remoteBlocksFetched = metrics.remoteBlocksFetched,
+ localBlocksFetched = metrics.localBlocksFetched,
+ remoteBytesRead = metrics.remoteBytesRead,
+ localBytesRead = metrics.localBytesRead,
+ fetchWaitTime = metrics.fetchWaitTime,
+ recordsRead = metrics.recordsRead,
+ totalBytesRead = metrics.totalBytesRead,
+ totalBlocksFetched = metrics.totalBlocksFetched
+ )
+ }
}
+ private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0)
}
case class ShuffleWriteMetricsUIData(
@@ -232,12 +268,17 @@ private[spark] object UIData {
object ShuffleWriteMetricsUIData {
def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
- new ShuffleWriteMetricsUIData(
- bytesWritten = metrics.bytesWritten,
- recordsWritten = metrics.recordsWritten,
- writeTime = metrics.writeTime
- )
+ if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) {
+ EMPTY
+ } else {
+ new ShuffleWriteMetricsUIData(
+ bytesWritten = metrics.bytesWritten,
+ recordsWritten = metrics.recordsWritten,
+ writeTime = metrics.writeTime
+ )
+ }
}
+ private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6593aab33f..4b4d2d10cb 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -702,8 +702,8 @@ private[spark] object JsonProtocol {
val index = (json \ "Index").extract[Int]
val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
- val executorId = (json \ "Executor ID").extract[String]
- val host = (json \ "Host").extract[String]
+ val executorId = (json \ "Executor ID").extract[String].intern()
+ val host = (json \ "Host").extract[String].intern()
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
@@ -721,7 +721,7 @@ private[spark] object JsonProtocol {
taskInfo.finishTime = finishTime
taskInfo.failed = failed
taskInfo.killed = killed
- accumulables.foreach { taskInfo.accumulables += _ }
+ taskInfo.setAccumulables(accumulables)
taskInfo
}
@@ -903,8 +903,8 @@ private[spark] object JsonProtocol {
if (json == JNothing) {
return null
}
- val executorId = (json \ "Executor ID").extract[String]
- val host = (json \ "Host").extract[String]
+ val executorId = (json \ "Executor ID").extract[String].intern()
+ val host = (json \ "Host").extract[String].intern()
val port = (json \ "Port").extract[Int]
BlockManagerId(executorId, host, port)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 8418fa74d2..da853f1be8 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -403,7 +403,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
internal = false,
countFailedValues = false,
metadata = None)
- taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum)
+ taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum))
val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
assert(newTaskInfo.accumulables === Seq(userAccum))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d5146d70eb..85da79180f 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -788,11 +788,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
speculative)
- val (acc1, acc2, acc3) =
- (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))
- taskInfo.accumulables += acc1
- taskInfo.accumulables += acc2
- taskInfo.accumulables += acc3
+ taskInfo.setAccumulables(
+ List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)))
taskInfo
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 350b144f82..12f7ed202b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -86,7 +86,10 @@ object MimaExcludes {
// [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness.
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"),
- ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=")
+ ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
+
+ // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 19b6d26031..948a155457 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
val taskInfo = createTaskInfo(0, 0)
- taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)
+ taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
listener.onOtherEvent(executionStart)
listener.onJobStart(jobStart)