aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-02-08 17:23:33 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-02-08 17:23:33 -0800
commiteeaf45b92695c577279f3a17d8c80ee40425e9aa (patch)
tree46ab78335e446ea77ac071225af5141b570a9f9b
parentff0af0ddfa4d198b203c3a39f8532cfbd4f4e027 (diff)
downloadspark-eeaf45b92695c577279f3a17d8c80ee40425e9aa.tar.gz
spark-eeaf45b92695c577279f3a17d8c80ee40425e9aa.tar.bz2
spark-eeaf45b92695c577279f3a17d8c80ee40425e9aa.zip
[SPARK-10620][SPARK-13054] Minor addendum to #10835
Additional changes to #10835, mainly related to style and visibility. This patch also adds back a few deprecated methods for backward compatibility. Author: Andrew Or <andrew@databricks.com> Closes #10958 from andrewor14/task-metrics-to-accums-followups.
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala4
-rw-r--r--project/MimaExcludes.scala3
17 files changed, 66 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 558bd447e2..5e8f1d4a70 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -60,19 +60,20 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @tparam T result type
*/
class Accumulator[T] private[spark] (
- @transient private[spark] val initialValue: T,
+ // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
+ @transient private val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean,
- override val countFailedValues: Boolean = false)
+ private[spark] override val countFailedValues: Boolean = false)
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
- this(initialValue, param, name, false)
+ this(initialValue, param, name, false /* internal */)
}
def this(initialValue: T, param: AccumulatorParam[T]) = {
- this(initialValue, param, None, false)
+ this(initialValue, param, None, false /* internal */)
}
}
@@ -84,7 +85,7 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
- * TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
+ * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
@GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index c191122c06..7aa9057858 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -119,7 +119,7 @@ private[spark] object InternalAccumulator {
/**
* Accumulators for tracking internal metrics.
*/
- def create(): Seq[Accumulator[_]] = {
+ def createAll(): Seq[Accumulator[_]] = {
Seq[String](
EXECUTOR_DESERIALIZE_TIME,
EXECUTOR_RUN_TIME,
@@ -188,7 +188,7 @@ private[spark] object InternalAccumulator {
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[_]] = {
- val accums = create()
+ val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 27ca46f73d..1d228b6b86 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -32,7 +32,7 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
- initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
+ initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends TaskContext
with Logging {
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 68340cc704..c8f201ea9e 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -118,7 +118,7 @@ case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
- exceptionWrapper: Option[ThrowableSerializationWrapper],
+ private val exceptionWrapper: Option[ThrowableSerializationWrapper],
accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
extends TaskFailedReason {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 51c000ea5c..00be3a240d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -300,15 +300,15 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accumulatorUpdates: Seq[AccumulableInfo] =
- if (task != null) {
- task.metrics.foreach { m =>
- m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
+ if (task != null) {
+ task.metrics.foreach { m =>
+ m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
+ m.setJvmGCTime(computeTotalGcTime() - startGCTime)
+ }
+ task.collectAccumulatorUpdates(taskFailed = true)
+ } else {
+ Seq.empty[AccumulableInfo]
}
- task.collectAccumulatorUpdates(taskFailed = true)
- } else {
- Seq.empty[AccumulableInfo]
- }
val serializedTaskEndReason = {
try {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0a6ebcb3e0..8ff0620f83 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -45,13 +45,12 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* these requirements.
*/
@DeveloperApi
-class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
-
+class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
import InternalAccumulator._
// Needed for Java tests
def this() {
- this(InternalAccumulator.create())
+ this(InternalAccumulator.createAll())
}
/**
@@ -144,6 +143,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
}
+ @deprecated("setting updated blocks is not allowed", "2.0.0")
+ def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
+ blocks.foreach(setUpdatedBlockStatuses)
+ }
+
// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
@@ -220,6 +224,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics
+ @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
+ def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
+ _outputMetrics = om
+ }
+
/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
@@ -296,6 +305,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
+ @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
+ def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
+ _shuffleWriteMetrics = swm
+ }
+
/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 885f70e89f..cd2736e196 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U](
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
- _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
+ _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 7b09c2eded..0a45ef5283 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -61,6 +61,7 @@ case class SparkListenerTaskEnd(
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
+ // may be null if the task has failed
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 29c5ff0b5c..0b68b88566 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -408,9 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
- val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.peakExecutionMemory.toDouble
- }
+ val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.peakExecutionMemory.toDouble
+ }
val peakExecutionMemoryQuantiles = {
<td>
<span data-toggle="tooltip"
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 38e6478d80..09d955300a 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -236,7 +236,7 @@ private[spark] object JsonProtocol {
val accumUpdates = metricsUpdate.accumUpdates
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Executor ID" -> execId) ~
- ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
+ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
("Stage Attempt ID" -> stageAttemptId) ~
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index e0fdd45973..4d49fe5159 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
val externalAccums = Seq(acc1, acc2)
- val internalAccums = InternalAccumulator.create()
+ val internalAccums = InternalAccumulator.createAll()
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 44a16e26f4..c426bb7a4e 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -87,7 +87,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
test("create") {
- val accums = create()
+ val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
@@ -123,7 +123,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
test("naming") {
- val accums = create()
+ val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
@@ -291,7 +291,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
assert(Accumulators.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
- val internalAccums = InternalAccumulator.create()
+ val internalAccums = InternalAccumulator.createAll()
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
assert(Accumulators.originals.size === internalAccums.size * 2)
val accumsRegistered = sc.cleaner match {
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 67c4595ed1..3a1a67cdc0 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -31,7 +31,7 @@ class TaskMetricsSuite extends SparkFunSuite {
import TaskMetricsSuite._
test("create") {
- val internalAccums = InternalAccumulator.create()
+ val internalAccums = InternalAccumulator.createAll()
val tm1 = new TaskMetrics
val tm2 = new TaskMetrics(internalAccums)
assert(tm1.accumulatorUpdates().size === internalAccums.size)
@@ -51,7 +51,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("create with unnamed accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
- InternalAccumulator.create() ++ Seq(
+ InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, None, internal = true)))
}
}
@@ -59,7 +59,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("create with duplicate name accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
- InternalAccumulator.create() ++ Seq(
+ InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
}
}
@@ -67,7 +67,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("create with external accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
- InternalAccumulator.create() ++ Seq(
+ InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, Some("x"))))
}
}
@@ -131,7 +131,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}
test("mutating values") {
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
// initial values
assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
@@ -180,7 +180,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("mutating shuffle read metrics values") {
import shuffleRead._
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
@@ -234,7 +234,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("mutating shuffle write metrics values") {
import shuffleWrite._
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
@@ -267,7 +267,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("mutating input metrics values") {
import input._
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
@@ -296,7 +296,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("mutating output metrics values") {
import output._
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
@@ -381,7 +381,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}
test("additional accumulables") {
- val internalAccums = InternalAccumulator.create()
+ val internalAccums = InternalAccumulator.createAll()
val tm = new TaskMetrics(internalAccums)
assert(tm.accumulatorUpdates().size === internalAccums.size)
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
@@ -419,7 +419,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("existing values in shuffle read accums") {
// set shuffle read accum before passing it into TaskMetrics
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
assert(srAccum.isDefined)
srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -432,7 +432,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("existing values in shuffle write accums") {
// set shuffle write accum before passing it into TaskMetrics
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
assert(swAccum.isDefined)
swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -445,7 +445,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("existing values in input accums") {
// set input accum before passing it into TaskMetrics
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
assert(inAccum.isDefined)
inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -458,7 +458,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("existing values in output accums") {
// set output accum before passing it into TaskMetrics
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
assert(outAccum.isDefined)
outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
@@ -470,7 +470,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}
test("from accumulator updates") {
- val accumUpdates1 = InternalAccumulator.create().map { a =>
+ val accumUpdates1 = InternalAccumulator.createAll().map { a =>
AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index b3bb86db10..850e470ca1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -127,7 +127,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val param = AccumulatorParam.LongAccumulatorParam
val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
- val initialAccums = InternalAccumulator.create()
+ val initialAccums = InternalAccumulator.createAll()
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 18a16a25bf..9876bded33 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
- val accums = InternalAccumulator.create()
+ val accums = InternalAccumulator.createAll()
accums.foreach(Accumulators.register)
val taskMetrics = new TaskMetrics(accums)
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 48951c3168..de6f408fa8 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -508,7 +508,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
/** -------------------------------- *
| Util methods for comparing events |
- * --------------------------------- */
+ * --------------------------------- */
private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
(event1, event2) match {
@@ -773,7 +773,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
/** ----------------------------------- *
| Util methods for constructing events |
- * ------------------------------------ */
+ * ------------------------------------ */
private val properties = {
val p = new Properties
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 8b1a7303fc..9209094385 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -187,7 +187,8 @@ object MimaExcludes {
) ++ Seq(
// SPARK-12896 Send only accumulator updates to driver, not TaskMetrics
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this")
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.Accumulator.initialValue")
) ++ Seq(
// SPARK-12692 Scala style: Fix the style violation (Space before "," or ":")
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),