aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-27 11:15:48 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-01-27 11:15:48 -0800
commit87abcf7df921a5937fdb2bae8bfb30bfabc4970a (patch)
tree74b5a1cb19f06c40bd99a85feee3f35efbb9a496 /core/src/test/scala
parentedd473751b59b55fa3daede5ed7bc19ea8bd7170 (diff)
downloadspark-87abcf7df921a5937fdb2bae8bfb30bfabc4970a.tar.gz
spark-87abcf7df921a5937fdb2bae8bfb30bfabc4970a.tar.bz2
spark-87abcf7df921a5937fdb2bae8bfb30bfabc4970a.zip
[SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators
The high level idea is that instead of having the executors send both accumulator updates and TaskMetrics, we should have them send only accumulator updates. This eliminates the need to maintain both code paths since one can be implemented in terms of the other. This effort is split into two parts: **SPARK-12895: Implement TaskMetrics using accumulators.** TaskMetrics is basically just a bunch of accumulable fields. This patch makes TaskMetrics a syntactic wrapper around a collection of accumulators so we don't need to send TaskMetrics from the executors to the driver. **SPARK-12896: Send only accumulator updates to the driver.** Now that TaskMetrics are expressed in terms of accumulators, we can capture all TaskMetrics values if we just send accumulator updates from the executors to the driver. This completes the parent issue SPARK-10620. While an effort has been made to preserve as much of the public API as possible, there were a few known breaking DeveloperApi changes that would be very awkward to maintain. I will gather the full list shortly and post it here. Note: This was once part of #10717. This patch is split out into its own patch from there to make it easier for others to review. Other smaller pieces of already been merged into master. Author: Andrew Or <andrew@databricks.com> Closes #10835 from andrewor14/task-metrics-use-accums.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala324
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala331
-rw-r--r--core/src/test/scala/org/apache/spark/SparkFunSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala540
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala281
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala56
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala67
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala515
15 files changed, 1727 insertions, 481 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 5b84acf40b..11c97d7d9a 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -17,18 +17,22 @@
package org.apache.spark
+import javax.annotation.concurrent.GuardedBy
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.ref.WeakReference
+import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.scalatest.exceptions.TestFailedException
import org.apache.spark.scheduler._
+import org.apache.spark.serializer.JavaSerializer
class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {
- import InternalAccumulator._
+ import AccumulatorParam._
implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
new AccumulableParam[mutable.Set[A], A] {
@@ -59,7 +63,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
longAcc.value should be (210L + maxInt * 20)
}
- test ("value not assignable from tasks") {
+ test("value not assignable from tasks") {
sc = new SparkContext("local", "test")
val acc : Accumulator[Int] = sc.accumulator(0)
@@ -84,7 +88,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
- test ("value not readable in tasks") {
+ test("value not readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
@@ -159,193 +163,157 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
assert(!Accumulators.originals.get(accId).isDefined)
}
- test("internal accumulators in TaskContext") {
+ test("get accum") {
sc = new SparkContext("local", "test")
- val accums = InternalAccumulator.create(sc)
- val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
- val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
- val collectedInternalAccums = taskContext.collectInternalAccumulators()
- val collectedAccums = taskContext.collectAccumulators()
- assert(internalMetricsToAccums.size > 0)
- assert(internalMetricsToAccums.values.forall(_.isInternal))
- assert(internalMetricsToAccums.contains(TEST_ACCUMULATOR))
- val testAccum = internalMetricsToAccums(TEST_ACCUMULATOR)
- assert(collectedInternalAccums.size === internalMetricsToAccums.size)
- assert(collectedInternalAccums.size === collectedAccums.size)
- assert(collectedInternalAccums.contains(testAccum.id))
- assert(collectedAccums.contains(testAccum.id))
- }
+ // Don't register with SparkContext for cleanup
+ var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val accId = acc.id
+ val ref = WeakReference(acc)
+ assert(ref.get.isDefined)
+ Accumulators.register(ref.get.get)
- test("internal accumulators in a stage") {
- val listener = new SaveInfoListener
- val numPartitions = 10
- sc = new SparkContext("local", "test")
- sc.addSparkListener(listener)
- // Have each task add 1 to the internal accumulator
- val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions)
- // The accumulator values should be merged in the stage
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- assert(stageAccum.value.toLong === numPartitions)
- // The accumulator should be updated locally on each task
- val taskAccumValues = taskInfos.map { taskInfo =>
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- taskAccum.value.toLong
- }
- // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ // Remove the explicit reference to it and allow weak reference to get garbage collected
+ acc = null
+ System.gc()
+ assert(ref.get.isEmpty)
+
+ // Getting a garbage collected accum should throw error
+ intercept[IllegalAccessError] {
+ Accumulators.get(accId)
}
- rdd.count()
+
+ // Getting a normal accumulator. Note: this has to be separate because referencing an
+ // accumulator above in an `assert` would keep it from being garbage collected.
+ val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true)
+ Accumulators.register(acc2)
+ assert(Accumulators.get(acc2.id) === Some(acc2))
+
+ // Getting an accumulator that does not exist should return None
+ assert(Accumulators.get(100000).isEmpty)
}
- test("internal accumulators in multiple stages") {
- val listener = new SaveInfoListener
- val numPartitions = 10
- sc = new SparkContext("local", "test")
- sc.addSparkListener(listener)
- // Each stage creates its own set of internal accumulators so the
- // values for the same metric should not be mixed up across stages
- val rdd = sc.parallelize(1 to 100, numPartitions)
- .map { i => (i, i) }
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- iter
- }
- .reduceByKey { case (x, y) => x + y }
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 10
- iter
- }
- .repartition(numPartitions * 2)
- .mapPartitions { iter =>
- TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 100
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- // We ran 3 stages, and the accumulator values should be distinct
- val stageInfos = listener.getCompletedStageInfos
- assert(stageInfos.size === 3)
- val (firstStageAccum, secondStageAccum, thirdStageAccum) =
- (findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR),
- findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR),
- findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR))
- assert(firstStageAccum.value.toLong === numPartitions)
- assert(secondStageAccum.value.toLong === numPartitions * 10)
- assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
- }
- rdd.count()
+ test("only external accums are automatically registered") {
+ val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false)
+ val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true)
+ assert(!accEx.isInternal)
+ assert(accIn.isInternal)
+ assert(Accumulators.get(accEx.id).isDefined)
+ assert(Accumulators.get(accIn.id).isEmpty)
}
- test("internal accumulators in fully resubmitted stages") {
- testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks
+ test("copy") {
+ val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false)
+ val acc2 = acc1.copy()
+ assert(acc1.id === acc2.id)
+ assert(acc1.value === acc2.value)
+ assert(acc1.name === acc2.name)
+ assert(acc1.isInternal === acc2.isInternal)
+ assert(acc1.countFailedValues === acc2.countFailedValues)
+ assert(acc1 !== acc2)
+ // Modifying one does not affect the other
+ acc1.add(44L)
+ assert(acc1.value === 500L)
+ assert(acc2.value === 456L)
+ acc2.add(144L)
+ assert(acc1.value === 500L)
+ assert(acc2.value === 600L)
}
- test("internal accumulators in partially resubmitted stages") {
- testInternalAccumulatorsWithFailedTasks((i: Int) => i % 2 == 0) // fail a subset
+ test("register multiple accums with same ID") {
+ // Make sure these are internal accums so we don't automatically register them already
+ val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val acc2 = acc1.copy()
+ assert(acc1 !== acc2)
+ assert(acc1.id === acc2.id)
+ assert(Accumulators.originals.isEmpty)
+ assert(Accumulators.get(acc1.id).isEmpty)
+ Accumulators.register(acc1)
+ Accumulators.register(acc2)
+ // The second one does not override the first one
+ assert(Accumulators.originals.size === 1)
+ assert(Accumulators.get(acc1.id) === Some(acc1))
}
- /**
- * Return the accumulable info that matches the specified name.
- */
- private def findAccumulableInfo(
- accums: Iterable[AccumulableInfo],
- name: String): AccumulableInfo = {
- accums.find { a => a.name == name }.getOrElse {
- throw new TestFailedException(s"internal accumulator '$name' not found", 0)
- }
+ test("string accumulator param") {
+ val acc = new Accumulator("", StringAccumulatorParam, Some("darkness"))
+ assert(acc.value === "")
+ acc.setValue("feeds")
+ assert(acc.value === "feeds")
+ acc.add("your")
+ assert(acc.value === "your") // value is overwritten, not concatenated
+ acc += "soul"
+ assert(acc.value === "soul")
+ acc ++= "with"
+ assert(acc.value === "with")
+ acc.merge("kindness")
+ assert(acc.value === "kindness")
}
- /**
- * Test whether internal accumulators are merged properly if some tasks fail.
- */
- private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = {
- val listener = new SaveInfoListener
- val numPartitions = 10
- val numFailedPartitions = (0 until numPartitions).count(failCondition)
- // This says use 1 core and retry tasks up to 2 times
- sc = new SparkContext("local[1, 2]", "test")
- sc.addSparkListener(listener)
- val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
- val taskContext = TaskContext.get()
- taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
- // Fail the first attempts of a subset of the tasks
- if (failCondition(i) && taskContext.attemptNumber() == 0) {
- throw new Exception("Failing a task intentionally.")
- }
- iter
- }
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { _ =>
- val stageInfos = listener.getCompletedStageInfos
- val taskInfos = listener.getCompletedTaskInfos
- assert(stageInfos.size === 1)
- assert(taskInfos.size === numPartitions + numFailedPartitions)
- val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
- // We should not double count values in the merged accumulator
- assert(stageAccum.value.toLong === numPartitions)
- val taskAccumValues = taskInfos.flatMap { taskInfo =>
- if (!taskInfo.failed) {
- // If a task succeeded, its update value should always be 1
- val taskAccum = findAccumulableInfo(taskInfo.accumulables, TEST_ACCUMULATOR)
- assert(taskAccum.update.isDefined)
- assert(taskAccum.update.get.toLong === 1)
- Some(taskAccum.value.toLong)
- } else {
- // If a task failed, we should not get its accumulator values
- assert(taskInfo.accumulables.isEmpty)
- None
- }
- }
- assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
- }
- rdd.count()
+ test("list accumulator param") {
+ val acc = new Accumulator(Seq.empty[Int], new ListAccumulatorParam[Int], Some("numbers"))
+ assert(acc.value === Seq.empty[Int])
+ acc.add(Seq(1, 2))
+ assert(acc.value === Seq(1, 2))
+ acc += Seq(3, 4)
+ assert(acc.value === Seq(1, 2, 3, 4))
+ acc ++= Seq(5, 6)
+ assert(acc.value === Seq(1, 2, 3, 4, 5, 6))
+ acc.merge(Seq(7, 8))
+ assert(acc.value === Seq(1, 2, 3, 4, 5, 6, 7, 8))
+ acc.setValue(Seq(9, 10))
+ assert(acc.value === Seq(9, 10))
+ }
+
+ test("value is reset on the executors") {
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
+ val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
+ val externalAccums = Seq(acc1, acc2)
+ val internalAccums = InternalAccumulator.create()
+ // Set some values; these should not be observed later on the "executors"
+ acc1.setValue(10)
+ acc2.setValue(20L)
+ internalAccums
+ .find(_.name == Some(InternalAccumulator.TEST_ACCUM))
+ .get.asInstanceOf[Accumulator[Long]]
+ .setValue(30L)
+ // Simulate the task being serialized and sent to the executors.
+ val dummyTask = new DummyTask(internalAccums, externalAccums)
+ val serInstance = new JavaSerializer(new SparkConf).newInstance()
+ val taskSer = Task.serializeWithDependencies(
+ dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
+ // Now we're on the executors.
+ // Deserialize the task and assert that its accumulators are zero'ed out.
+ val (_, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
+ val taskDeser = serInstance.deserialize[DummyTask](
+ taskBytes, Thread.currentThread.getContextClassLoader)
+ // Assert that executors see only zeros
+ taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
+ taskDeser.internalAccums.foreach { a => assert(a.localValue == a.zero) }
}
}
private[spark] object AccumulatorSuite {
+ import InternalAccumulator._
+
/**
- * Run one or more Spark jobs and verify that the peak execution memory accumulator
- * is updated afterwards.
+ * Run one or more Spark jobs and verify that in at least one job the peak execution memory
+ * accumulator is updated afterwards.
*/
def verifyPeakExecutionMemorySet(
sc: SparkContext,
testName: String)(testBody: => Unit): Unit = {
val listener = new SaveInfoListener
sc.addSparkListener(listener)
- // Register asserts in job completion callback to avoid flakiness
- listener.registerJobCompletionCallback { jobId =>
- if (jobId == 0) {
- // The first job is a dummy one to verify that the accumulator does not already exist
- val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
- assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
- } else {
- // In the subsequent jobs, verify that peak execution memory is updated
- val accum = listener.getCompletedStageInfos
- .flatMap(_.accumulables.values)
- .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
- .getOrElse {
- throw new TestFailedException(
- s"peak execution memory accumulator not set in '$testName'", 0)
- }
- assert(accum.value.toLong > 0)
- }
- }
- // Run the jobs
- sc.parallelize(1 to 10).count()
testBody
+ val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
+ val isSet = accums.exists { a =>
+ a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)
+ }
+ if (!isSet) {
+ throw new TestFailedException(s"peak execution memory accumulator not set in '$testName'", 0)
+ }
}
}
@@ -357,6 +325,10 @@ private class SaveInfoListener extends SparkListener {
private val completedTaskInfos: ArrayBuffer[TaskInfo] = new ArrayBuffer[TaskInfo]
private var jobCompletionCallback: (Int => Unit) = null // parameter is job ID
+ // Accesses must be synchronized to ensure failures in `jobCompletionCallback` are propagated
+ @GuardedBy("this")
+ private var exception: Throwable = null
+
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq
@@ -365,9 +337,20 @@ private class SaveInfoListener extends SparkListener {
jobCompletionCallback = callback
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ /** Throw a stored exception, if any. */
+ def maybeThrowException(): Unit = synchronized {
+ if (exception != null) { throw exception }
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
if (jobCompletionCallback != null) {
- jobCompletionCallback(jobEnd.jobId)
+ try {
+ jobCompletionCallback(jobEnd.jobId)
+ } catch {
+ // Store any exception thrown here so we can throw them later in the main thread.
+ // Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test.
+ case NonFatal(e) => exception = e
+ }
}
}
@@ -379,3 +362,14 @@ private class SaveInfoListener extends SparkListener {
completedTaskInfos += taskEnd.taskInfo
}
}
+
+
+/**
+ * A dummy [[Task]] that contains internal and external [[Accumulator]]s.
+ */
+private[spark] class DummyTask(
+ val internalAccums: Seq[Accumulator[_]],
+ val externalAccums: Seq[Accumulator[_]])
+ extends Task[Int](0, 0, 0, internalAccums) {
+ override def runTask(c: TaskContext): Int = 1
+}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 4e678fbac6..80a1de6065 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -801,7 +801,7 @@ class ExecutorAllocationManagerSuite
assert(maxNumExecutorsNeeded(manager) === 1)
// If the task is failed, we expect it to be resubmitted later.
- val taskEndReason = ExceptionFailure(null, null, null, null, null, None)
+ val taskEndReason = ExceptionFailure(null, null, null, null, None)
sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
assert(maxNumExecutorsNeeded(manager) === 1)
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index c7f629a14b..3777d77f8f 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -215,14 +215,16 @@ class HeartbeatReceiverSuite
val metrics = new TaskMetrics
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
- Heartbeat(executorId, Array(1L -> metrics), blockManagerId))
+ Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
} else {
assert(!response.reregisterBlockManager)
// Additionally verify that the scheduler callback is called with the correct parameters
verify(scheduler).executorHeartbeatReceived(
- Matchers.eq(executorId), Matchers.eq(Array(1L -> metrics)), Matchers.eq(blockManagerId))
+ Matchers.eq(executorId),
+ Matchers.eq(Array(1L -> metrics.accumulatorUpdates())),
+ Matchers.eq(blockManagerId))
}
}
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
new file mode 100644
index 0000000000..630b46f828
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.storage.{BlockId, BlockStatus}
+
+
+class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
+ import InternalAccumulator._
+ import AccumulatorParam._
+
+ test("get param") {
+ assert(getParam(EXECUTOR_DESERIALIZE_TIME) === LongAccumulatorParam)
+ assert(getParam(EXECUTOR_RUN_TIME) === LongAccumulatorParam)
+ assert(getParam(RESULT_SIZE) === LongAccumulatorParam)
+ assert(getParam(JVM_GC_TIME) === LongAccumulatorParam)
+ assert(getParam(RESULT_SERIALIZATION_TIME) === LongAccumulatorParam)
+ assert(getParam(MEMORY_BYTES_SPILLED) === LongAccumulatorParam)
+ assert(getParam(DISK_BYTES_SPILLED) === LongAccumulatorParam)
+ assert(getParam(PEAK_EXECUTION_MEMORY) === LongAccumulatorParam)
+ assert(getParam(UPDATED_BLOCK_STATUSES) === UpdatedBlockStatusesAccumulatorParam)
+ assert(getParam(TEST_ACCUM) === LongAccumulatorParam)
+ // shuffle read
+ assert(getParam(shuffleRead.REMOTE_BLOCKS_FETCHED) === IntAccumulatorParam)
+ assert(getParam(shuffleRead.LOCAL_BLOCKS_FETCHED) === IntAccumulatorParam)
+ assert(getParam(shuffleRead.REMOTE_BYTES_READ) === LongAccumulatorParam)
+ assert(getParam(shuffleRead.LOCAL_BYTES_READ) === LongAccumulatorParam)
+ assert(getParam(shuffleRead.FETCH_WAIT_TIME) === LongAccumulatorParam)
+ assert(getParam(shuffleRead.RECORDS_READ) === LongAccumulatorParam)
+ // shuffle write
+ assert(getParam(shuffleWrite.BYTES_WRITTEN) === LongAccumulatorParam)
+ assert(getParam(shuffleWrite.RECORDS_WRITTEN) === LongAccumulatorParam)
+ assert(getParam(shuffleWrite.WRITE_TIME) === LongAccumulatorParam)
+ // input
+ assert(getParam(input.READ_METHOD) === StringAccumulatorParam)
+ assert(getParam(input.RECORDS_READ) === LongAccumulatorParam)
+ assert(getParam(input.BYTES_READ) === LongAccumulatorParam)
+ // output
+ assert(getParam(output.WRITE_METHOD) === StringAccumulatorParam)
+ assert(getParam(output.RECORDS_WRITTEN) === LongAccumulatorParam)
+ assert(getParam(output.BYTES_WRITTEN) === LongAccumulatorParam)
+ // default to Long
+ assert(getParam(METRICS_PREFIX + "anything") === LongAccumulatorParam)
+ intercept[IllegalArgumentException] {
+ getParam("something that does not start with the right prefix")
+ }
+ }
+
+ test("create by name") {
+ val executorRunTime = create(EXECUTOR_RUN_TIME)
+ val updatedBlockStatuses = create(UPDATED_BLOCK_STATUSES)
+ val shuffleRemoteBlocksRead = create(shuffleRead.REMOTE_BLOCKS_FETCHED)
+ val inputReadMethod = create(input.READ_METHOD)
+ assert(executorRunTime.name === Some(EXECUTOR_RUN_TIME))
+ assert(updatedBlockStatuses.name === Some(UPDATED_BLOCK_STATUSES))
+ assert(shuffleRemoteBlocksRead.name === Some(shuffleRead.REMOTE_BLOCKS_FETCHED))
+ assert(inputReadMethod.name === Some(input.READ_METHOD))
+ assert(executorRunTime.value.isInstanceOf[Long])
+ assert(updatedBlockStatuses.value.isInstanceOf[Seq[_]])
+ // We cannot assert the type of the value directly since the type parameter is erased.
+ // Instead, try casting a `Seq` of expected type and see if it fails in run time.
+ updatedBlockStatuses.setValueAny(Seq.empty[(BlockId, BlockStatus)])
+ assert(shuffleRemoteBlocksRead.value.isInstanceOf[Int])
+ assert(inputReadMethod.value.isInstanceOf[String])
+ // default to Long
+ val anything = create(METRICS_PREFIX + "anything")
+ assert(anything.value.isInstanceOf[Long])
+ }
+
+ test("create") {
+ val accums = create()
+ val shuffleReadAccums = createShuffleReadAccums()
+ val shuffleWriteAccums = createShuffleWriteAccums()
+ val inputAccums = createInputAccums()
+ val outputAccums = createOutputAccums()
+ // assert they're all internal
+ assert(accums.forall(_.isInternal))
+ assert(shuffleReadAccums.forall(_.isInternal))
+ assert(shuffleWriteAccums.forall(_.isInternal))
+ assert(inputAccums.forall(_.isInternal))
+ assert(outputAccums.forall(_.isInternal))
+ // assert they all count on failures
+ assert(accums.forall(_.countFailedValues))
+ assert(shuffleReadAccums.forall(_.countFailedValues))
+ assert(shuffleWriteAccums.forall(_.countFailedValues))
+ assert(inputAccums.forall(_.countFailedValues))
+ assert(outputAccums.forall(_.countFailedValues))
+ // assert they all have names
+ assert(accums.forall(_.name.isDefined))
+ assert(shuffleReadAccums.forall(_.name.isDefined))
+ assert(shuffleWriteAccums.forall(_.name.isDefined))
+ assert(inputAccums.forall(_.name.isDefined))
+ assert(outputAccums.forall(_.name.isDefined))
+ // assert `accums` is a strict superset of the others
+ val accumNames = accums.map(_.name.get).toSet
+ val shuffleReadAccumNames = shuffleReadAccums.map(_.name.get).toSet
+ val shuffleWriteAccumNames = shuffleWriteAccums.map(_.name.get).toSet
+ val inputAccumNames = inputAccums.map(_.name.get).toSet
+ val outputAccumNames = outputAccums.map(_.name.get).toSet
+ assert(shuffleReadAccumNames.subsetOf(accumNames))
+ assert(shuffleWriteAccumNames.subsetOf(accumNames))
+ assert(inputAccumNames.subsetOf(accumNames))
+ assert(outputAccumNames.subsetOf(accumNames))
+ }
+
+ test("naming") {
+ val accums = create()
+ val shuffleReadAccums = createShuffleReadAccums()
+ val shuffleWriteAccums = createShuffleWriteAccums()
+ val inputAccums = createInputAccums()
+ val outputAccums = createOutputAccums()
+ // assert that prefixes are properly namespaced
+ assert(SHUFFLE_READ_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(SHUFFLE_WRITE_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(INPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(OUTPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX))
+ assert(accums.forall(_.name.get.startsWith(METRICS_PREFIX)))
+ // assert they all start with the expected prefixes
+ assert(shuffleReadAccums.forall(_.name.get.startsWith(SHUFFLE_READ_METRICS_PREFIX)))
+ assert(shuffleWriteAccums.forall(_.name.get.startsWith(SHUFFLE_WRITE_METRICS_PREFIX)))
+ assert(inputAccums.forall(_.name.get.startsWith(INPUT_METRICS_PREFIX)))
+ assert(outputAccums.forall(_.name.get.startsWith(OUTPUT_METRICS_PREFIX)))
+ }
+
+ test("internal accumulators in TaskContext") {
+ val taskContext = TaskContext.empty()
+ val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
+ assert(accumUpdates.size > 0)
+ assert(accumUpdates.forall(_.internal))
+ val testAccum = taskContext.taskMetrics.getAccum(TEST_ACCUM)
+ assert(accumUpdates.exists(_.id == testAccum.id))
+ }
+
+ test("internal accumulators in a stage") {
+ val listener = new SaveInfoListener
+ val numPartitions = 10
+ sc = new SparkContext("local", "test")
+ sc.addSparkListener(listener)
+ // Have each task add 1 to the internal accumulator
+ val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1
+ iter
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ val stageInfos = listener.getCompletedStageInfos
+ val taskInfos = listener.getCompletedTaskInfos
+ assert(stageInfos.size === 1)
+ assert(taskInfos.size === numPartitions)
+ // The accumulator values should be merged in the stage
+ val stageAccum = findTestAccum(stageInfos.head.accumulables.values)
+ assert(stageAccum.value.get.asInstanceOf[Long] === numPartitions)
+ // The accumulator should be updated locally on each task
+ val taskAccumValues = taskInfos.map { taskInfo =>
+ val taskAccum = findTestAccum(taskInfo.accumulables)
+ assert(taskAccum.update.isDefined)
+ assert(taskAccum.update.get.asInstanceOf[Long] === 1L)
+ taskAccum.value.get.asInstanceOf[Long]
+ }
+ // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
+ assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ }
+ rdd.count()
+ }
+
+ test("internal accumulators in multiple stages") {
+ val listener = new SaveInfoListener
+ val numPartitions = 10
+ sc = new SparkContext("local", "test")
+ sc.addSparkListener(listener)
+ // Each stage creates its own set of internal accumulators so the
+ // values for the same metric should not be mixed up across stages
+ val rdd = sc.parallelize(1 to 100, numPartitions)
+ .map { i => (i, i) }
+ .mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1
+ iter
+ }
+ .reduceByKey { case (x, y) => x + y }
+ .mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 10
+ iter
+ }
+ .repartition(numPartitions * 2)
+ .mapPartitions { iter =>
+ TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 100
+ iter
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ // We ran 3 stages, and the accumulator values should be distinct
+ val stageInfos = listener.getCompletedStageInfos
+ assert(stageInfos.size === 3)
+ val (firstStageAccum, secondStageAccum, thirdStageAccum) =
+ (findTestAccum(stageInfos(0).accumulables.values),
+ findTestAccum(stageInfos(1).accumulables.values),
+ findTestAccum(stageInfos(2).accumulables.values))
+ assert(firstStageAccum.value.get.asInstanceOf[Long] === numPartitions)
+ assert(secondStageAccum.value.get.asInstanceOf[Long] === numPartitions * 10)
+ assert(thirdStageAccum.value.get.asInstanceOf[Long] === numPartitions * 2 * 100)
+ }
+ rdd.count()
+ }
+
+ // TODO: these two tests are incorrect; they don't actually trigger stage retries.
+ ignore("internal accumulators in fully resubmitted stages") {
+ testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks
+ }
+
+ ignore("internal accumulators in partially resubmitted stages") {
+ testInternalAccumulatorsWithFailedTasks((i: Int) => i % 2 == 0) // fail a subset
+ }
+
+ test("internal accumulators are registered for cleanups") {
+ sc = new SparkContext("local", "test") {
+ private val myCleaner = new SaveAccumContextCleaner(this)
+ override def cleaner: Option[ContextCleaner] = Some(myCleaner)
+ }
+ assert(Accumulators.originals.isEmpty)
+ sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
+ val internalAccums = InternalAccumulator.create()
+ // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
+ assert(Accumulators.originals.size === internalAccums.size * 2)
+ val accumsRegistered = sc.cleaner match {
+ case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
+ case _ => Seq.empty[Long]
+ }
+ // Make sure the same set of accumulators is registered for cleanup
+ assert(accumsRegistered.size === internalAccums.size * 2)
+ assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
+ }
+
+ /**
+ * Return the accumulable info that matches the specified name.
+ */
+ private def findTestAccum(accums: Iterable[AccumulableInfo]): AccumulableInfo = {
+ accums.find { a => a.name == Some(TEST_ACCUM) }.getOrElse {
+ fail(s"unable to find internal accumulator called $TEST_ACCUM")
+ }
+ }
+
+ /**
+ * Test whether internal accumulators are merged properly if some tasks fail.
+ * TODO: make this actually retry the stage.
+ */
+ private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = {
+ val listener = new SaveInfoListener
+ val numPartitions = 10
+ val numFailedPartitions = (0 until numPartitions).count(failCondition)
+ // This says use 1 core and retry tasks up to 2 times
+ sc = new SparkContext("local[1, 2]", "test")
+ sc.addSparkListener(listener)
+ val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, iter) =>
+ val taskContext = TaskContext.get()
+ taskContext.taskMetrics().getAccum(TEST_ACCUM) += 1
+ // Fail the first attempts of a subset of the tasks
+ if (failCondition(i) && taskContext.attemptNumber() == 0) {
+ throw new Exception("Failing a task intentionally.")
+ }
+ iter
+ }
+ // Register asserts in job completion callback to avoid flakiness
+ listener.registerJobCompletionCallback { _ =>
+ val stageInfos = listener.getCompletedStageInfos
+ val taskInfos = listener.getCompletedTaskInfos
+ assert(stageInfos.size === 1)
+ assert(taskInfos.size === numPartitions + numFailedPartitions)
+ val stageAccum = findTestAccum(stageInfos.head.accumulables.values)
+ // If all partitions failed, then we would resubmit the whole stage again and create a
+ // fresh set of internal accumulators. Otherwise, these internal accumulators do count
+ // failed values, so we must include the failed values.
+ val expectedAccumValue =
+ if (numPartitions == numFailedPartitions) {
+ numPartitions
+ } else {
+ numPartitions + numFailedPartitions
+ }
+ assert(stageAccum.value.get.asInstanceOf[Long] === expectedAccumValue)
+ val taskAccumValues = taskInfos.flatMap { taskInfo =>
+ if (!taskInfo.failed) {
+ // If a task succeeded, its update value should always be 1
+ val taskAccum = findTestAccum(taskInfo.accumulables)
+ assert(taskAccum.update.isDefined)
+ assert(taskAccum.update.get.asInstanceOf[Long] === 1L)
+ assert(taskAccum.value.isDefined)
+ Some(taskAccum.value.get.asInstanceOf[Long])
+ } else {
+ // If a task failed, we should not get its accumulator values
+ assert(taskInfo.accumulables.isEmpty)
+ None
+ }
+ }
+ assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+ }
+ rdd.count()
+ listener.maybeThrowException()
+ }
+
+ /**
+ * A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup.
+ */
+ private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
+ private val accumsRegistered = new ArrayBuffer[Long]
+
+ override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+ accumsRegistered += a.id
+ super.registerAccumulatorForCleanup(a)
+ }
+
+ def accumsRegisteredForCleanup: Seq[Long] = accumsRegistered.toArray
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 9be9db01c7..d3359c7406 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -42,6 +42,8 @@ private[spark] abstract class SparkFunSuite extends FunSuite with Logging {
test()
} finally {
logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
+ // Avoid leaking map entries in tests that use accumulators without SparkContext
+ Accumulators.clear()
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index e5ec2aa1be..15be0b194e 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -17,12 +17,542 @@
package org.apache.spark.executor
-import org.apache.spark.SparkFunSuite
+import org.scalatest.Assertions
+
+import org.apache.spark._
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId}
+
class TaskMetricsSuite extends SparkFunSuite {
- test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") {
- val taskMetrics = new TaskMetrics()
- taskMetrics.mergeShuffleReadMetrics()
- assert(taskMetrics.shuffleReadMetrics.isEmpty)
+ import AccumulatorParam._
+ import InternalAccumulator._
+ import StorageLevel._
+ import TaskMetricsSuite._
+
+ test("create") {
+ val internalAccums = InternalAccumulator.create()
+ val tm1 = new TaskMetrics
+ val tm2 = new TaskMetrics(internalAccums)
+ assert(tm1.accumulatorUpdates().size === internalAccums.size)
+ assert(tm1.shuffleReadMetrics.isEmpty)
+ assert(tm1.shuffleWriteMetrics.isEmpty)
+ assert(tm1.inputMetrics.isEmpty)
+ assert(tm1.outputMetrics.isEmpty)
+ assert(tm2.accumulatorUpdates().size === internalAccums.size)
+ assert(tm2.shuffleReadMetrics.isEmpty)
+ assert(tm2.shuffleWriteMetrics.isEmpty)
+ assert(tm2.inputMetrics.isEmpty)
+ assert(tm2.outputMetrics.isEmpty)
+ // TaskMetrics constructor expects minimal set of initial accumulators
+ intercept[IllegalArgumentException] { new TaskMetrics(Seq.empty[Accumulator[_]]) }
+ }
+
+ test("create with unnamed accum") {
+ intercept[IllegalArgumentException] {
+ new TaskMetrics(
+ InternalAccumulator.create() ++ Seq(
+ new Accumulator(0, IntAccumulatorParam, None, internal = true)))
+ }
+ }
+
+ test("create with duplicate name accum") {
+ intercept[IllegalArgumentException] {
+ new TaskMetrics(
+ InternalAccumulator.create() ++ Seq(
+ new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
+ }
+ }
+
+ test("create with external accum") {
+ intercept[IllegalArgumentException] {
+ new TaskMetrics(
+ InternalAccumulator.create() ++ Seq(
+ new Accumulator(0, IntAccumulatorParam, Some("x"))))
+ }
+ }
+
+ test("create shuffle read metrics") {
+ import shuffleRead._
+ val accums = InternalAccumulator.createShuffleReadAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(REMOTE_BLOCKS_FETCHED).setValueAny(1)
+ accums(LOCAL_BLOCKS_FETCHED).setValueAny(2)
+ accums(REMOTE_BYTES_READ).setValueAny(3L)
+ accums(LOCAL_BYTES_READ).setValueAny(4L)
+ accums(FETCH_WAIT_TIME).setValueAny(5L)
+ accums(RECORDS_READ).setValueAny(6L)
+ val sr = new ShuffleReadMetrics(accums)
+ assert(sr.remoteBlocksFetched === 1)
+ assert(sr.localBlocksFetched === 2)
+ assert(sr.remoteBytesRead === 3L)
+ assert(sr.localBytesRead === 4L)
+ assert(sr.fetchWaitTime === 5L)
+ assert(sr.recordsRead === 6L)
+ }
+
+ test("create shuffle write metrics") {
+ import shuffleWrite._
+ val accums = InternalAccumulator.createShuffleWriteAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(BYTES_WRITTEN).setValueAny(1L)
+ accums(RECORDS_WRITTEN).setValueAny(2L)
+ accums(WRITE_TIME).setValueAny(3L)
+ val sw = new ShuffleWriteMetrics(accums)
+ assert(sw.bytesWritten === 1L)
+ assert(sw.recordsWritten === 2L)
+ assert(sw.writeTime === 3L)
+ }
+
+ test("create input metrics") {
+ import input._
+ val accums = InternalAccumulator.createInputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(BYTES_READ).setValueAny(1L)
+ accums(RECORDS_READ).setValueAny(2L)
+ accums(READ_METHOD).setValueAny(DataReadMethod.Hadoop.toString)
+ val im = new InputMetrics(accums)
+ assert(im.bytesRead === 1L)
+ assert(im.recordsRead === 2L)
+ assert(im.readMethod === DataReadMethod.Hadoop)
+ }
+
+ test("create output metrics") {
+ import output._
+ val accums = InternalAccumulator.createOutputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
+ accums(BYTES_WRITTEN).setValueAny(1L)
+ accums(RECORDS_WRITTEN).setValueAny(2L)
+ accums(WRITE_METHOD).setValueAny(DataWriteMethod.Hadoop.toString)
+ val om = new OutputMetrics(accums)
+ assert(om.bytesWritten === 1L)
+ assert(om.recordsWritten === 2L)
+ assert(om.writeMethod === DataWriteMethod.Hadoop)
+ }
+
+ test("mutating values") {
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ // initial values
+ assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
+ assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 0L)
+ assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 0L)
+ assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 0L)
+ assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 0L)
+ assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 0L)
+ assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 0L)
+ assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 0L)
+ assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES,
+ Seq.empty[(BlockId, BlockStatus)])
+ // set or increment values
+ tm.setExecutorDeserializeTime(100L)
+ tm.setExecutorDeserializeTime(1L) // overwrite
+ tm.setExecutorRunTime(200L)
+ tm.setExecutorRunTime(2L)
+ tm.setResultSize(300L)
+ tm.setResultSize(3L)
+ tm.setJvmGCTime(400L)
+ tm.setJvmGCTime(4L)
+ tm.setResultSerializationTime(500L)
+ tm.setResultSerializationTime(5L)
+ tm.incMemoryBytesSpilled(600L)
+ tm.incMemoryBytesSpilled(6L) // add
+ tm.incDiskBytesSpilled(700L)
+ tm.incDiskBytesSpilled(7L)
+ tm.incPeakExecutionMemory(800L)
+ tm.incPeakExecutionMemory(8L)
+ val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L))
+ val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L))
+ tm.incUpdatedBlockStatuses(Seq(block1))
+ tm.incUpdatedBlockStatuses(Seq(block2))
+ // assert new values exist
+ assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 1L)
+ assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 2L)
+ assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 3L)
+ assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 4L)
+ assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 5L)
+ assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 606L)
+ assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 707L)
+ assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 808L)
+ assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES,
+ Seq(block1, block2))
+ }
+
+ test("mutating shuffle read metrics values") {
+ import shuffleRead._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
+ }
+ // create shuffle read metrics
+ assert(tm.shuffleReadMetrics.isEmpty)
+ tm.registerTempShuffleReadMetrics()
+ tm.mergeShuffleReadMetrics()
+ assert(tm.shuffleReadMetrics.isDefined)
+ val sr = tm.shuffleReadMetrics.get
+ // initial values
+ assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
+ assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
+ assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L)
+ assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L)
+ assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 0L)
+ // set and increment values
+ sr.setRemoteBlocksFetched(100)
+ sr.setRemoteBlocksFetched(10)
+ sr.incRemoteBlocksFetched(1) // 10 + 1
+ sr.incRemoteBlocksFetched(1) // 10 + 1 + 1
+ sr.setLocalBlocksFetched(200)
+ sr.setLocalBlocksFetched(20)
+ sr.incLocalBlocksFetched(2)
+ sr.incLocalBlocksFetched(2)
+ sr.setRemoteBytesRead(300L)
+ sr.setRemoteBytesRead(30L)
+ sr.incRemoteBytesRead(3L)
+ sr.incRemoteBytesRead(3L)
+ sr.setLocalBytesRead(400L)
+ sr.setLocalBytesRead(40L)
+ sr.incLocalBytesRead(4L)
+ sr.incLocalBytesRead(4L)
+ sr.setFetchWaitTime(500L)
+ sr.setFetchWaitTime(50L)
+ sr.incFetchWaitTime(5L)
+ sr.incFetchWaitTime(5L)
+ sr.setRecordsRead(600L)
+ sr.setRecordsRead(60L)
+ sr.incRecordsRead(6L)
+ sr.incRecordsRead(6L)
+ // assert new values exist
+ assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12)
+ assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24)
+ assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L)
+ assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L)
+ assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 72L)
+ }
+
+ test("mutating shuffle write metrics values") {
+ import shuffleWrite._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
+ }
+ // create shuffle write metrics
+ assert(tm.shuffleWriteMetrics.isEmpty)
+ tm.registerShuffleWriteMetrics()
+ assert(tm.shuffleWriteMetrics.isDefined)
+ val sw = tm.shuffleWriteMetrics.get
+ // initial values
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
+ assertValEquals(_.writeTime, WRITE_TIME, 0L)
+ // increment and decrement values
+ sw.incBytesWritten(100L)
+ sw.incBytesWritten(10L) // 100 + 10
+ sw.decBytesWritten(1L) // 100 + 10 - 1
+ sw.decBytesWritten(1L) // 100 + 10 - 1 - 1
+ sw.incRecordsWritten(200L)
+ sw.incRecordsWritten(20L)
+ sw.decRecordsWritten(2L)
+ sw.decRecordsWritten(2L)
+ sw.incWriteTime(300L)
+ sw.incWriteTime(30L)
+ // assert new values exist
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 108L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 216L)
+ assertValEquals(_.writeTime, WRITE_TIME, 330L)
+ }
+
+ test("mutating input metrics values") {
+ import input._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
+ (x: Any, y: Any) => assert(x.toString === y.toString))
+ }
+ // create input metrics
+ assert(tm.inputMetrics.isEmpty)
+ tm.registerInputMetrics(DataReadMethod.Memory)
+ assert(tm.inputMetrics.isDefined)
+ val in = tm.inputMetrics.get
+ // initial values
+ assertValEquals(_.bytesRead, BYTES_READ, 0L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 0L)
+ assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory)
+ // set and increment values
+ in.setBytesRead(1L)
+ in.setBytesRead(2L)
+ in.incRecordsRead(1L)
+ in.incRecordsRead(2L)
+ in.setReadMethod(DataReadMethod.Disk)
+ // assert new values exist
+ assertValEquals(_.bytesRead, BYTES_READ, 2L)
+ assertValEquals(_.recordsRead, RECORDS_READ, 3L)
+ assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk)
+ }
+
+ test("mutating output metrics values") {
+ import output._
+ val accums = InternalAccumulator.create()
+ val tm = new TaskMetrics(accums)
+ def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
+ assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
+ (x: Any, y: Any) => assert(x.toString === y.toString))
+ }
+ // create input metrics
+ assert(tm.outputMetrics.isEmpty)
+ tm.registerOutputMetrics(DataWriteMethod.Hadoop)
+ assert(tm.outputMetrics.isDefined)
+ val out = tm.outputMetrics.get
+ // initial values
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
+ assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
+ // set values
+ out.setBytesWritten(1L)
+ out.setBytesWritten(2L)
+ out.setRecordsWritten(3L)
+ out.setRecordsWritten(4L)
+ out.setWriteMethod(DataWriteMethod.Hadoop)
+ // assert new values exist
+ assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
+ assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
+ // Note: this doesn't actually test anything, but there's only one DataWriteMethod
+ // so we can't set it to anything else
+ assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
+ }
+
+ test("merging multiple shuffle read metrics") {
+ val tm = new TaskMetrics
+ assert(tm.shuffleReadMetrics.isEmpty)
+ val sr1 = tm.registerTempShuffleReadMetrics()
+ val sr2 = tm.registerTempShuffleReadMetrics()
+ val sr3 = tm.registerTempShuffleReadMetrics()
+ assert(tm.shuffleReadMetrics.isEmpty)
+ sr1.setRecordsRead(10L)
+ sr2.setRecordsRead(10L)
+ sr1.setFetchWaitTime(1L)
+ sr2.setFetchWaitTime(2L)
+ sr3.setFetchWaitTime(3L)
+ tm.mergeShuffleReadMetrics()
+ assert(tm.shuffleReadMetrics.isDefined)
+ val sr = tm.shuffleReadMetrics.get
+ assert(sr.remoteBlocksFetched === 0L)
+ assert(sr.recordsRead === 20L)
+ assert(sr.fetchWaitTime === 6L)
+
+ // SPARK-5701: calling merge without any shuffle deps does nothing
+ val tm2 = new TaskMetrics
+ tm2.mergeShuffleReadMetrics()
+ assert(tm2.shuffleReadMetrics.isEmpty)
+ }
+
+ test("register multiple shuffle write metrics") {
+ val tm = new TaskMetrics
+ val sw1 = tm.registerShuffleWriteMetrics()
+ val sw2 = tm.registerShuffleWriteMetrics()
+ assert(sw1 === sw2)
+ assert(tm.shuffleWriteMetrics === Some(sw1))
+ }
+
+ test("register multiple input metrics") {
+ val tm = new TaskMetrics
+ val im1 = tm.registerInputMetrics(DataReadMethod.Memory)
+ val im2 = tm.registerInputMetrics(DataReadMethod.Memory)
+ // input metrics with a different read method than the one already registered are ignored
+ val im3 = tm.registerInputMetrics(DataReadMethod.Hadoop)
+ assert(im1 === im2)
+ assert(im1 !== im3)
+ assert(tm.inputMetrics === Some(im1))
+ im2.setBytesRead(50L)
+ im3.setBytesRead(100L)
+ assert(tm.inputMetrics.get.bytesRead === 50L)
+ }
+
+ test("register multiple output metrics") {
+ val tm = new TaskMetrics
+ val om1 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
+ val om2 = tm.registerOutputMetrics(DataWriteMethod.Hadoop)
+ assert(om1 === om2)
+ assert(tm.outputMetrics === Some(om1))
+ }
+
+ test("additional accumulables") {
+ val internalAccums = InternalAccumulator.create()
+ val tm = new TaskMetrics(internalAccums)
+ assert(tm.accumulatorUpdates().size === internalAccums.size)
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
+ val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
+ val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
+ val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"),
+ internal = true, countFailedValues = true)
+ tm.registerAccumulator(acc1)
+ tm.registerAccumulator(acc2)
+ tm.registerAccumulator(acc3)
+ tm.registerAccumulator(acc4)
+ acc1 += 1
+ acc2 += 2
+ val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap
+ assert(newUpdates.contains(acc1.id))
+ assert(newUpdates.contains(acc2.id))
+ assert(newUpdates.contains(acc3.id))
+ assert(newUpdates.contains(acc4.id))
+ assert(newUpdates(acc1.id).name === Some("a"))
+ assert(newUpdates(acc2.id).name === Some("b"))
+ assert(newUpdates(acc3.id).name === Some("c"))
+ assert(newUpdates(acc4.id).name === Some("d"))
+ assert(newUpdates(acc1.id).update === Some(1))
+ assert(newUpdates(acc2.id).update === Some(2))
+ assert(newUpdates(acc3.id).update === Some(0))
+ assert(newUpdates(acc4.id).update === Some(0))
+ assert(!newUpdates(acc3.id).internal)
+ assert(!newUpdates(acc3.id).countFailedValues)
+ assert(newUpdates(acc4.id).internal)
+ assert(newUpdates(acc4.id).countFailedValues)
+ assert(newUpdates.values.map(_.update).forall(_.isDefined))
+ assert(newUpdates.values.map(_.value).forall(_.isEmpty))
+ assert(newUpdates.size === internalAccums.size + 4)
+ }
+
+ test("existing values in shuffle read accums") {
+ // set shuffle read accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
+ assert(srAccum.isDefined)
+ srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm = new TaskMetrics(accums)
+ assert(tm.shuffleReadMetrics.isDefined)
+ assert(tm.shuffleWriteMetrics.isEmpty)
+ assert(tm.inputMetrics.isEmpty)
+ assert(tm.outputMetrics.isEmpty)
+ }
+
+ test("existing values in shuffle write accums") {
+ // set shuffle write accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
+ assert(swAccum.isDefined)
+ swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm = new TaskMetrics(accums)
+ assert(tm.shuffleReadMetrics.isEmpty)
+ assert(tm.shuffleWriteMetrics.isDefined)
+ assert(tm.inputMetrics.isEmpty)
+ assert(tm.outputMetrics.isEmpty)
+ }
+
+ test("existing values in input accums") {
+ // set input accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
+ assert(inAccum.isDefined)
+ inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm = new TaskMetrics(accums)
+ assert(tm.shuffleReadMetrics.isEmpty)
+ assert(tm.shuffleWriteMetrics.isEmpty)
+ assert(tm.inputMetrics.isDefined)
+ assert(tm.outputMetrics.isEmpty)
}
+
+ test("existing values in output accums") {
+ // set output accum before passing it into TaskMetrics
+ val accums = InternalAccumulator.create()
+ val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
+ assert(outAccum.isDefined)
+ outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
+ val tm4 = new TaskMetrics(accums)
+ assert(tm4.shuffleReadMetrics.isEmpty)
+ assert(tm4.shuffleWriteMetrics.isEmpty)
+ assert(tm4.inputMetrics.isEmpty)
+ assert(tm4.outputMetrics.isDefined)
+ }
+
+ test("from accumulator updates") {
+ val accumUpdates1 = InternalAccumulator.create().map { a =>
+ AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
+ }
+ val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
+ assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
+ // Test this with additional accumulators. Only the ones registered with `Accumulators`
+ // will show up in the reconstructed TaskMetrics. In practice, all accumulators created
+ // on the driver, internal or not, should be registered with `Accumulators` at some point.
+ // Here we show that reconstruction will succeed even if there are unregistered accumulators.
+ val param = IntAccumulatorParam
+ val registeredAccums = Seq(
+ new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
+ new Accumulator(0, param, Some("b"), internal = true, countFailedValues = false),
+ new Accumulator(0, param, Some("c"), internal = false, countFailedValues = true),
+ new Accumulator(0, param, Some("d"), internal = false, countFailedValues = false))
+ val unregisteredAccums = Seq(
+ new Accumulator(0, param, Some("e"), internal = true, countFailedValues = true),
+ new Accumulator(0, param, Some("f"), internal = true, countFailedValues = false))
+ registeredAccums.foreach(Accumulators.register)
+ registeredAccums.foreach { a => assert(Accumulators.originals.contains(a.id)) }
+ unregisteredAccums.foreach { a => assert(!Accumulators.originals.contains(a.id)) }
+ // set some values in these accums
+ registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
+ unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
+ val registeredAccumInfos = registeredAccums.map(makeInfo)
+ val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
+ val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
+ val metrics2 = TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
+ // accumulators that were not registered with `Accumulators` will not show up
+ assertUpdatesEquals(metrics2.accumulatorUpdates(), accumUpdates1 ++ registeredAccumInfos)
+ }
+}
+
+
+private[spark] object TaskMetricsSuite extends Assertions {
+
+ /**
+ * Assert that the following three things are equal to `value`:
+ * (1) TaskMetrics value
+ * (2) TaskMetrics accumulator update value
+ * (3) Original accumulator value
+ */
+ def assertValueEquals(
+ tm: TaskMetrics,
+ tmValue: TaskMetrics => Any,
+ accums: Seq[Accumulator[_]],
+ metricName: String,
+ value: Any,
+ assertEquals: (Any, Any) => Unit = (x: Any, y: Any) => assert(x === y)): Unit = {
+ assertEquals(tmValue(tm), value)
+ val accum = accums.find(_.name == Some(metricName))
+ assert(accum.isDefined)
+ assertEquals(accum.get.value, value)
+ val accumUpdate = tm.accumulatorUpdates().find(_.name == Some(metricName))
+ assert(accumUpdate.isDefined)
+ assert(accumUpdate.get.value === None)
+ assertEquals(accumUpdate.get.update, Some(value))
+ }
+
+ /**
+ * Assert that two lists of accumulator updates are equal.
+ * Note: this does NOT check accumulator ID equality.
+ */
+ def assertUpdatesEquals(
+ updates1: Seq[AccumulableInfo],
+ updates2: Seq[AccumulableInfo]): Unit = {
+ assert(updates1.size === updates2.size)
+ updates1.zip(updates2).foreach { case (info1, info2) =>
+ // do not assert ID equals here
+ assert(info1.name === info2.name)
+ assert(info1.update === info2.update)
+ assert(info1.value === info2.value)
+ assert(info1.internal === info2.internal)
+ assert(info1.countFailedValues === info2.countFailedValues)
+ }
+ }
+
+ /**
+ * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
+ * info as an accumulator update.
+ */
+ def makeInfo(a: Accumulable[_, _]): AccumulableInfo = {
+ new AccumulableInfo(a.id, a.name, Some(a.value), None, a.isInternal, a.countFailedValues)
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
index 0e60cc8e77..2b5e4b80e9 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
@@ -31,7 +31,6 @@ object MemoryTestingUtils {
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
- metricsSystem = env.metricsSystem,
- internalAccumulators = Seq.empty)
+ metricsSystem = env.metricsSystem)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 370a284d29..d9c71ec2ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
-import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
@@ -96,8 +95,7 @@ class MyRDD(
class DAGSchedulerSuiteDummyException extends Exception
-class DAGSchedulerSuite
- extends SparkFunSuite with BeforeAndAfter with LocalSparkContext with Timeouts {
+class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts {
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
@@ -111,8 +109,10 @@ class DAGSchedulerSuite
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
- override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean = true
+ override def executorHeartbeatReceived(
+ execId: String,
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
@@ -189,7 +189,8 @@ class DAGSchedulerSuite
override def jobFailed(exception: Exception): Unit = { failure = exception }
}
- before {
+ override def beforeEach(): Unit = {
+ super.beforeEach()
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
@@ -202,17 +203,21 @@ class DAGSchedulerSuite
results.clear()
mapOutputTracker = new MapOutputTrackerMaster(conf)
scheduler = new DAGScheduler(
- sc,
- taskScheduler,
- sc.listenerBus,
- mapOutputTracker,
- blockManagerMaster,
- sc.env)
+ sc,
+ taskScheduler,
+ sc.listenerBus,
+ mapOutputTracker,
+ blockManagerMaster,
+ sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
}
- after {
- scheduler.stop()
+ override def afterEach(): Unit = {
+ try {
+ scheduler.stop()
+ } finally {
+ super.afterEach()
+ }
}
override def afterAll() {
@@ -242,26 +247,31 @@ class DAGSchedulerSuite
* directly through CompletionEvents.
*/
private val jobComputeFunc = (context: TaskContext, it: Iterator[(_)]) =>
- it.next.asInstanceOf[Tuple2[_, _]]._1
+ it.next.asInstanceOf[Tuple2[_, _]]._1
/** Send the given CompletionEvent messages for the tasks in the TaskSet. */
private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(
- taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2))
}
}
}
- private def completeWithAccumulator(accumId: Long, taskSet: TaskSet,
- results: Seq[(TaskEndReason, Any)]) {
+ private def completeWithAccumulator(
+ accumId: Long,
+ taskSet: TaskSet,
+ results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
- Map[Long, Any]((accumId, 1)), createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(
+ taskSet.tasks(i),
+ result._1,
+ result._2,
+ Seq(new AccumulableInfo(
+ accumId, Some(""), Some(1), None, internal = false, countFailedValues = false))))
}
}
}
@@ -338,9 +348,12 @@ class DAGSchedulerSuite
}
test("equals and hashCode AccumulableInfo") {
- val accInfo1 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, true)
- val accInfo2 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
- val accInfo3 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
+ val accInfo1 = new AccumulableInfo(
+ 1, Some("a1"), Some("delta1"), Some("val1"), internal = true, countFailedValues = false)
+ val accInfo2 = new AccumulableInfo(
+ 1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false)
+ val accInfo3 = new AccumulableInfo(
+ 1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false)
assert(accInfo1 !== accInfo2)
assert(accInfo2 === accInfo3)
assert(accInfo2.hashCode() === accInfo3.hashCode())
@@ -464,7 +477,7 @@ class DAGSchedulerSuite
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, TaskMetrics)],
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
@@ -499,8 +512,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
complete(taskSets(1), Seq((Success, 42)))
@@ -515,12 +528,12 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
- (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
- (Success, 42),
- (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
+ (Success, 42),
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("exec-hostA")
// ask the scheduler to try it again
@@ -829,23 +842,17 @@ class DAGSchedulerSuite
HashSet("hostA", "hostB"))
// The first result task fails, with a fetch failure for the output from the first mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// The SparkListener should not receive redundant failure events.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
@@ -882,12 +889,9 @@ class DAGSchedulerSuite
HashSet("hostA", "hostB"))
// The first result task fails, with a fetch failure for the output from the first mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.failedStages.contains(1))
@@ -900,12 +904,9 @@ class DAGSchedulerSuite
assert(countSubmittedMapStageAttempts() === 2)
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// Another ResubmitFailedStages event should not result in another attempt for the map
@@ -920,11 +921,11 @@ class DAGSchedulerSuite
}
/**
- * This tests the case where a late FetchFailed comes in after the map stage has finished getting
- * retried and a new reduce stage starts running.
- */
+ * This tests the case where a late FetchFailed comes in after the map stage has finished getting
+ * retried and a new reduce stage starts running.
+ */
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
- "the same stage") {
+ "the same stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
@@ -952,12 +953,9 @@ class DAGSchedulerSuite
assert(countSubmittedReduceStageAttempts() === 1)
// The first result task fails, with a fetch failure for the output from the first mapper.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// Trigger resubmission of the failed map stage and finish the re-started map task.
@@ -971,12 +969,9 @@ class DAGSchedulerSuite
assert(countSubmittedReduceStageAttempts() === 2)
// A late FetchFailed arrives from the second task in the original reduce stage.
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
- null,
- Map[Long, Any](),
- createFakeTaskInfo(),
null))
// Running ResubmitFailedStages shouldn't result in any more attempts for the map stage, because
@@ -1007,48 +1002,36 @@ class DAGSchedulerSuite
assert(shuffleStage.numAvailableOutputs === 0)
// should be ignored for being too old
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(0),
Success,
- makeMapStatus("hostA", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 0)
// should work because it's a non-failed host (so the available map outputs will increase)
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(0),
Success,
- makeMapStatus("hostB", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostB", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 1)
// should be ignored for being too old
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(0),
Success,
- makeMapStatus("hostA", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 1)
// should work because it's a new epoch, which will increase the number of available map
// outputs, and also finish the stage
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSet.tasks(1),
Success,
- makeMapStatus("hostA", reduceRdd.partitions.size),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostA", reduceRdd.partitions.size)))
assert(shuffleStage.numAvailableOutputs === 2)
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
- HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
+ HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
// finish the next stage normally, which completes the job
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -1140,12 +1123,9 @@ class DAGSchedulerSuite
// then one executor dies, and a task fails in stage 1
runEvent(ExecutorLost("exec-hostA"))
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
- null,
- null,
- createFakeTaskInfo(),
null))
// so we resubmit stage 0, which completes happily
@@ -1155,13 +1135,10 @@ class DAGSchedulerSuite
assert(stage0Resubmit.stageAttemptId === 1)
val task = stage0Resubmit.tasks(0)
assert(task.partitionId === 2)
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
task,
Success,
- makeMapStatus("hostC", shuffleMapRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
// now here is where things get tricky : we will now have a task set representing
// the second attempt for stage 1, but we *also* have some tasks for the first attempt for
@@ -1174,28 +1151,19 @@ class DAGSchedulerSuite
// we'll have some tasks finish from the first attempt, and some finish from the second attempt,
// so that we actually have all stage outputs, though no attempt has completed all its
// tasks
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(3).tasks(0),
Success,
- makeMapStatus("hostC", reduceRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
- runEvent(CompletionEvent(
+ makeMapStatus("hostC", reduceRdd.partitions.length)))
+ runEvent(makeCompletionEvent(
taskSets(3).tasks(1),
Success,
- makeMapStatus("hostC", reduceRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostC", reduceRdd.partitions.length)))
// late task finish from the first attempt
- runEvent(CompletionEvent(
+ runEvent(makeCompletionEvent(
taskSets(1).tasks(2),
Success,
- makeMapStatus("hostB", reduceRdd.partitions.length),
- null,
- createFakeTaskInfo(),
- null))
+ makeMapStatus("hostB", reduceRdd.partitions.length)))
// What should happen now is that we submit stage 2. However, we might not see an error
// b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But
@@ -1242,21 +1210,21 @@ class DAGSchedulerSuite
submit(reduceRdd, Array(0))
// complete some of the tasks from the first stage, on one host
- runEvent(CompletionEvent(
- taskSets(0).tasks(0), Success,
- makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
- runEvent(CompletionEvent(
- taskSets(0).tasks(1), Success,
- makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(0),
+ Success,
+ makeMapStatus("hostA", reduceRdd.partitions.length)))
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(1),
+ Success,
+ makeMapStatus("hostA", reduceRdd.partitions.length)))
// now that host goes down
runEvent(ExecutorLost("exec-hostA"))
// so we resubmit those tasks
- runEvent(CompletionEvent(
- taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null))
- runEvent(CompletionEvent(
- taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
+ runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null))
// now complete everything on a different host
complete(taskSets(0), Seq(
@@ -1449,12 +1417,12 @@ class DAGSchedulerSuite
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
// have hostC complete the resubmitted task
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
- HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+ HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
@@ -1469,15 +1437,15 @@ class DAGSchedulerSuite
submit(finalRdd, Array(0))
// have the first stage complete normally
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 2)),
- (Success, makeMapStatus("hostB", 2))))
+ (Success, makeMapStatus("hostA", 2)),
+ (Success, makeMapStatus("hostB", 2))))
// have the second stage complete normally
complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostC", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// have DAGScheduler try again
@@ -1500,15 +1468,15 @@ class DAGSchedulerSuite
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
// complete stage 0
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 2)),
- (Success, makeMapStatus("hostB", 2))))
+ (Success, makeMapStatus("hostA", 2)),
+ (Success, makeMapStatus("hostB", 2))))
// complete stage 1
complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", 1)),
- (Success, makeMapStatus("hostB", 1))))
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
// pretend stage 2 failed because hostA went down
complete(taskSets(2), Seq(
- (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this:
// blockManagerMaster.removeExecutor("exec-hostA")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
@@ -1606,6 +1574,28 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ test("accumulators are updated on exception failures") {
+ val acc1 = sc.accumulator(0L, "ingenieur")
+ val acc2 = sc.accumulator(0L, "boulanger")
+ val acc3 = sc.accumulator(0L, "agriculteur")
+ assert(Accumulators.get(acc1.id).isDefined)
+ assert(Accumulators.get(acc2.id).isDefined)
+ assert(Accumulators.get(acc3.id).isDefined)
+ val accInfo1 = new AccumulableInfo(
+ acc1.id, acc1.name, Some(15L), None, internal = false, countFailedValues = false)
+ val accInfo2 = new AccumulableInfo(
+ acc2.id, acc2.name, Some(13L), None, internal = false, countFailedValues = false)
+ val accInfo3 = new AccumulableInfo(
+ acc3.id, acc3.name, Some(18L), None, internal = false, countFailedValues = false)
+ val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
+ val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
+ submit(new MyRDD(sc, 1, Nil), Array(0))
+ runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result"))
+ assert(Accumulators.get(acc1.id).get.value === 15L)
+ assert(Accumulators.get(acc2.id).get.value === 13L)
+ assert(Accumulators.get(acc3.id).get.value === 18L)
+ }
+
test("reduce tasks should be placed locally with map output") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
@@ -1614,9 +1604,9 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", 1))))
+ (Success, makeMapStatus("hostA", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
- HashSet(makeBlockManagerId("hostA")))
+ HashSet(makeBlockManagerId("hostA")))
// Reducer should run on the same host that map task ran
val reduceTaskSet = taskSets(1)
@@ -1884,8 +1874,7 @@ class DAGSchedulerSuite
submitMapStage(shuffleDep)
val oldTaskSet = taskSets(0)
- runEvent(CompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
// Pretend host A was lost
@@ -1895,23 +1884,19 @@ class DAGSchedulerSuite
assert(newEpoch > oldEpoch)
// Suppose we also get a completed event from task 1 on the same host; this should be ignored
- runEvent(CompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
// A completion from another task should work because it's a non-failed host
- runEvent(CompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
- runEvent(CompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
- runEvent(CompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2),
- null, createFakeTaskInfo(), null))
+ runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
assert(results.size === 1) // Map stage job should now finally be complete
assertDataStructuresEmpty()
@@ -1962,5 +1947,21 @@ class DAGSchedulerSuite
info
}
-}
+ private def makeCompletionEvent(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
+ taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
+ val accumUpdates = reason match {
+ case Success =>
+ task.initialAccumulators.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(a.zero), None, a.isInternal, a.countFailedValues)
+ }
+ case ef: ExceptionFailure => ef.accumUpdates
+ case _ => Seq.empty[AccumulableInfo]
+ }
+ CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 761e82e6cf..35215c15ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
/**
* Test whether ReplayListenerBus replays events from logs correctly.
@@ -131,7 +131,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(sc.eventLogger.isDefined)
val originalEvents = sc.eventLogger.get.loggedEvents
val replayedEvents = eventMonster.loggedEvents
- originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
+ originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
+ // Don't compare the JSON here because accumulators in StageInfo may be out of order
+ JsonProtocolSuite.assertEquals(
+ JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2))
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index e5ec44a9f3..b3bb86db10 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -22,6 +22,8 @@ import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark._
+import org.apache.spark.executor.TaskMetricsSuite
+import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
@@ -57,8 +59,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
- val task = new ResultTask[String, String](
- 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, Seq.empty)
+ val task = new ResultTask[String, String](0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0)
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -97,6 +98,57 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}.collect()
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}
+
+ test("accumulators are updated on exception failures") {
+ // This means use 1 core and 4 max task failures
+ sc = new SparkContext("local[1,4]", "test")
+ val param = AccumulatorParam.LongAccumulatorParam
+ // Create 2 accumulators, one that counts failed values and another that doesn't
+ val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ // Fail first 3 attempts of every task. This means each task should be run 4 times.
+ sc.parallelize(1 to 10, 10).map { i =>
+ acc1 += 1
+ acc2 += 1
+ if (TaskContext.get.attemptNumber() <= 2) {
+ throw new Exception("you did something wrong")
+ } else {
+ 0
+ }
+ }.count()
+ // The one that counts failed values should be 4x the one that didn't,
+ // since we ran each task 4 times
+ assert(Accumulators.get(acc1.id).get.value === 40L)
+ assert(Accumulators.get(acc2.id).get.value === 10L)
+ }
+
+ test("failed tasks collect only accumulators whose values count during failures") {
+ sc = new SparkContext("local", "test")
+ val param = AccumulatorParam.LongAccumulatorParam
+ val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ val initialAccums = InternalAccumulator.create()
+ // Create a dummy task. We won't end up running this; we just want to collect
+ // accumulator updates from it.
+ val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
+ context = new TaskContextImpl(0, 0, 0L, 0,
+ new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
+ SparkEnv.get.metricsSystem,
+ initialAccums)
+ context.taskMetrics.registerAccumulator(acc1)
+ context.taskMetrics.registerAccumulator(acc2)
+ override def runTask(tc: TaskContext): Int = 0
+ }
+ // First, simulate task success. This should give us all the accumulators.
+ val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
+ val accumUpdates2 = (initialAccums ++ Seq(acc1, acc2)).map(TaskMetricsSuite.makeInfo)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
+ // Now, simulate task failures. This should give us only the accums that count failed values.
+ val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
+ val accumUpdates4 = (initialAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
+ }
+
}
private object TaskContextSuite {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index cc2557c2f1..b5385c11a9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -21,10 +21,15 @@ import java.io.File
import java.net.URL
import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal
+import com.google.common.util.concurrent.MoreExecutors
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers.{any, anyLong}
+import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
@@ -33,13 +38,14 @@ import org.apache.spark.storage.TaskResultBlockId
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
+
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
*
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
-class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
+private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
@@ -72,6 +78,31 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
}
}
+
+/**
+ * A [[TaskResultGetter]] that stores the [[DirectTaskResult]]s it receives from executors
+ * _before_ modifying the results in any way.
+ */
+private class MyTaskResultGetter(env: SparkEnv, scheduler: TaskSchedulerImpl)
+ extends TaskResultGetter(env, scheduler) {
+
+ // Use the current thread so we can access its results synchronously
+ protected override val getTaskResultExecutor = MoreExecutors.sameThreadExecutor()
+
+ // DirectTaskResults that we receive from the executors
+ private val _taskResults = new ArrayBuffer[DirectTaskResult[_]]
+
+ def taskResults: Seq[DirectTaskResult[_]] = _taskResults
+
+ override def enqueueSuccessfulTask(tsm: TaskSetManager, tid: Long, data: ByteBuffer): Unit = {
+ // work on a copy since the super class still needs to use the buffer
+ val newBuffer = data.duplicate()
+ _taskResults += env.closureSerializer.newInstance().deserialize[DirectTaskResult[_]](newBuffer)
+ super.enqueueSuccessfulTask(tsm, tid, data)
+ }
+}
+
+
/**
* Tests related to handling task results (both direct and indirect).
*/
@@ -182,5 +213,39 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
+
+ test("task result size is set on the driver, not the executors") {
+ import InternalAccumulator._
+
+ // Set up custom TaskResultGetter and TaskSchedulerImpl spy
+ sc = new SparkContext("local", "test", conf)
+ val scheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
+ val spyScheduler = spy(scheduler)
+ val resultGetter = new MyTaskResultGetter(sc.env, spyScheduler)
+ val newDAGScheduler = new DAGScheduler(sc, spyScheduler)
+ scheduler.taskResultGetter = resultGetter
+ sc.dagScheduler = newDAGScheduler
+ sc.taskScheduler = spyScheduler
+ sc.taskScheduler.setDAGScheduler(newDAGScheduler)
+
+ // Just run 1 task and capture the corresponding DirectTaskResult
+ sc.parallelize(1 to 1, 1).count()
+ val captor = ArgumentCaptor.forClass(classOf[DirectTaskResult[_]])
+ verify(spyScheduler, times(1)).handleSuccessfulTask(any(), anyLong(), captor.capture())
+
+ // When a task finishes, the executor sends a serialized DirectTaskResult to the driver
+ // without setting the result size so as to avoid serializing the result again. Instead,
+ // the result size is set later in TaskResultGetter on the driver before passing the
+ // DirectTaskResult on to TaskSchedulerImpl. In this test, we capture the DirectTaskResult
+ // before and after the result size is set.
+ assert(resultGetter.taskResults.size === 1)
+ val resBefore = resultGetter.taskResults.head
+ val resAfter = captor.getValue
+ val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+ val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+ assert(resSizeBefore.exists(_ == 0L))
+ assert(resSizeAfter.exists(_.toString.toLong > 0L))
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ecc18fc6e1..a2e7436564 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ManualClock
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
@@ -38,9 +37,8 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) {
+ accumUpdates: Seq[AccumulableInfo],
+ taskInfo: TaskInfo) {
taskScheduler.endedTasks(taskInfo.index) = reason
}
@@ -167,14 +165,17 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+ val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
+ }
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
- var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// Tell it the task has finished
- manager.handleSuccessfulTask(0, createTaskResult(0))
+ manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
assert(sched.endedTasks(0) === Success)
assert(sched.finishedManagers.contains(manager))
}
@@ -184,10 +185,15 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+ val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
+ task.initialAccumulators.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
+ }
+ }
// First three offers should all find tasks
for (i <- 0 until 3) {
- var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
@@ -198,14 +204,14 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)
// Finish the first two tasks
- manager.handleSuccessfulTask(0, createTaskResult(0))
- manager.handleSuccessfulTask(1, createTaskResult(1))
+ manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0)))
+ manager.handleSuccessfulTask(1, createTaskResult(1, accumUpdatesByTask(1)))
assert(sched.endedTasks(0) === Success)
assert(sched.endedTasks(1) === Success)
assert(!sched.finishedManagers.contains(manager))
// Finish the last task
- manager.handleSuccessfulTask(2, createTaskResult(2))
+ manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2)))
assert(sched.endedTasks(2) === Success)
assert(sched.finishedManagers.contains(manager))
}
@@ -620,7 +626,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// multiple 1k result
val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect()
- assert(10 === r.size )
+ assert(10 === r.size)
// single 10M result
val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()}
@@ -761,7 +767,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
- ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
@@ -786,8 +792,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
}
- def createTaskResult(id: Int): DirectTaskResult[Int] = {
+ private def createTaskResult(
+ id: Int,
+ accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
- new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
+ new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 86699e7f56..b83ffa3282 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.ui.scope.RDDOperationGraphListener
class StagePageSuite extends SparkFunSuite with LocalSparkContext {
+ private val peakExecutionMemory = 10
+
test("peak execution memory only displayed if unsafe is enabled") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
@@ -52,7 +54,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf(false).set(unsafeConf, "true")
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
- assert(html.contains("<td>10.0 b</td>" * 5))
+ assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
}
/**
@@ -79,14 +81,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
(1 to 2).foreach {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
- val peakExecutionMemory = 10
- taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY,
- Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
+ val taskMetrics = TaskMetrics.empty
+ taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd(
- SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
+ SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
}
jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 607617cbe9..18a16a25bf 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -240,7 +240,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0, "ignored"),
- ExceptionFailure("Exception", "description", null, null, None, None),
+ ExceptionFailure("Exception", "description", null, null, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure("0", true, Some("Induced failure")),
@@ -269,20 +269,22 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
- val taskMetrics = new TaskMetrics()
- taskMetrics.setExecutorRunTime(base + 4)
- taskMetrics.incDiskBytesSpilled(base + 5)
- taskMetrics.incMemoryBytesSpilled(base + 6)
+ val accums = InternalAccumulator.create()
+ accums.foreach(Accumulators.register)
+ val taskMetrics = new TaskMetrics(accums)
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
+ val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
+ val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
+ val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
taskMetrics.mergeShuffleReadMetrics()
- val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
shuffleWriteMetrics.incBytesWritten(base + 3)
- val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
- inputMetrics.incBytesRead(base + 7)
- val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
+ taskMetrics.setExecutorRunTime(base + 4)
+ taskMetrics.incDiskBytesSpilled(base + 5)
+ taskMetrics.incMemoryBytesSpilled(base + 6)
+ inputMetrics.setBytesRead(base + 7)
outputMetrics.setBytesWritten(base + 8)
taskMetrics
}
@@ -300,9 +302,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
- (1234L, 0, 0, makeTaskMetrics(0)),
- (1235L, 0, 0, makeTaskMetrics(100)),
- (1236L, 1, 0, makeTaskMetrics(200)))))
+ (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()),
+ (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()),
+ (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates()))))
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index e5ca2de4ad..57021d1d3d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -22,6 +22,10 @@ import java.util.Properties
import scala.collection.Map
import org.json4s.jackson.JsonMethods._
+import org.json4s.JsonAST.{JArray, JInt, JString, JValue}
+import org.json4s.JsonDSL._
+import org.scalatest.Assertions
+import org.scalatest.exceptions.TestFailedException
import org.apache.spark._
import org.apache.spark.executor._
@@ -32,12 +36,7 @@ import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage._
class JsonProtocolSuite extends SparkFunSuite {
-
- val jobSubmissionTime = 1421191042750L
- val jobCompletionTime = 1421191296660L
-
- val executorAddedTime = 1421458410000L
- val executorRemovedTime = 1421458922000L
+ import JsonProtocolSuite._
test("SparkListenerEvent") {
val stageSubmitted =
@@ -82,9 +81,13 @@ class JsonProtocolSuite extends SparkFunSuite {
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
- val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq(
- (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
- hasHadoopInput = true, hasOutput = true))))
+ val executorMetricsUpdate = {
+ // Use custom accum ID for determinism
+ val accumUpdates =
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
+ .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) }
+ SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
+ }
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -142,7 +145,7 @@ class JsonProtocolSuite extends SparkFunSuite {
"Some exception")
val fetchMetadataFailed = new MetadataFetchFailedException(17,
19, "metadata Fetch failed exception").toTaskEndReason
- val exceptionFailure = new ExceptionFailure(exception, None)
+ val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo])
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
testTaskEndReason(fetchFailed)
@@ -166,9 +169,8 @@ class JsonProtocolSuite extends SparkFunSuite {
| Backward compatibility tests |
* ============================== */
- test("ExceptionFailure backward compatibility") {
- val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null,
- None, None)
+ test("ExceptionFailure backward compatibility: full stack trace") {
+ val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None)
val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
.removeField({ _._1 == "Full Stack Trace" })
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
@@ -273,14 +275,13 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
- test("ShuffleReadMetrics: Local bytes read and time taken backwards compatibility") {
- // Metrics about local shuffle bytes read and local read time were added in 1.3.1.
+ test("ShuffleReadMetrics: Local bytes read backwards compatibility") {
+ // Metrics about local shuffle bytes read were added in 1.3.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
hasHadoopInput = false, hasOutput = false, hasRecords = false)
assert(metrics.shuffleReadMetrics.nonEmpty)
val newJson = JsonProtocol.taskMetricsToJson(metrics)
val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
- .removeField { case (field, _) => field == "Local Read Time" }
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
}
@@ -371,22 +372,76 @@ class JsonProtocolSuite extends SparkFunSuite {
}
test("AccumulableInfo backward compatibility") {
- // "Internal" property of AccumulableInfo were added after 1.5.1.
- val accumulableInfo = makeAccumulableInfo(1)
+ // "Internal" property of AccumulableInfo was added in 1.5.1
+ val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
.removeField({ _._1 == "Internal" })
val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
- assert(false === oldInfo.internal)
+ assert(!oldInfo.internal)
+ // "Count Failed Values" property of AccumulableInfo was added in 2.0.0
+ val oldJson2 = JsonProtocol.accumulableInfoToJson(accumulableInfo)
+ .removeField({ _._1 == "Count Failed Values" })
+ val oldInfo2 = JsonProtocol.accumulableInfoFromJson(oldJson2)
+ assert(!oldInfo2.countFailedValues)
+ }
+
+ test("ExceptionFailure backward compatibility: accumulator updates") {
+ // "Task Metrics" was replaced with "Accumulator Updates" in 2.0.0. For older event logs,
+ // we should still be able to fallback to constructing the accumulator updates from the
+ // "Task Metrics" field, if it exists.
+ val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
+ val tmJson = JsonProtocol.taskMetricsToJson(tm)
+ val accumUpdates = tm.accumulatorUpdates()
+ val exception = new SparkException("sentimental")
+ val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
+ val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
+ val tmFieldJson: JValue = "Task Metrics" -> tmJson
+ val oldExceptionFailureJson: JValue =
+ exceptionFailureJson.removeField { _._1 == "Accumulator Updates" }.merge(tmFieldJson)
+ val oldExceptionFailure =
+ JsonProtocol.taskEndReasonFromJson(oldExceptionFailureJson).asInstanceOf[ExceptionFailure]
+ assert(exceptionFailure.className === oldExceptionFailure.className)
+ assert(exceptionFailure.description === oldExceptionFailure.description)
+ assertSeqEquals[StackTraceElement](
+ exceptionFailure.stackTrace, oldExceptionFailure.stackTrace, assertStackTraceElementEquals)
+ assert(exceptionFailure.fullStackTrace === oldExceptionFailure.fullStackTrace)
+ assertSeqEquals[AccumulableInfo](
+ exceptionFailure.accumUpdates, oldExceptionFailure.accumUpdates, (x, y) => x == y)
}
- /** -------------------------- *
- | Helper test running methods |
- * --------------------------- */
+ test("AccumulableInfo value de/serialization") {
+ import InternalAccumulator._
+ val blocks = Seq[(BlockId, BlockStatus)](
+ (TestBlockId("meebo"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+ (TestBlockId("feebo"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+ val blocksJson = JArray(blocks.toList.map { case (id, status) =>
+ ("Block ID" -> id.toString) ~
+ ("Status" -> JsonProtocol.blockStatusToJson(status))
+ })
+ testAccumValue(Some(RESULT_SIZE), 3L, JInt(3))
+ testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2))
+ testAccumValue(Some(input.READ_METHOD), "aka", JString("aka"))
+ testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson)
+ // For anything else, we just cast the value to a string
+ testAccumValue(Some("anything"), blocks, JString(blocks.toString))
+ testAccumValue(Some("anything"), 123, JString("123"))
+ }
+
+}
+
+
+private[spark] object JsonProtocolSuite extends Assertions {
+ import InternalAccumulator._
+
+ private val jobSubmissionTime = 1421191042750L
+ private val jobCompletionTime = 1421191296660L
+ private val executorAddedTime = 1421458410000L
+ private val executorRemovedTime = 1421458922000L
private def testEvent(event: SparkListenerEvent, jsonString: String) {
val actualJsonString = compact(render(JsonProtocol.sparkEventToJson(event)))
val newEvent = JsonProtocol.sparkEventFromJson(parse(actualJsonString))
- assertJsonStringEquals(jsonString, actualJsonString)
+ assertJsonStringEquals(jsonString, actualJsonString, event.getClass.getSimpleName)
assertEquals(event, newEvent)
}
@@ -440,11 +495,19 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(info, newInfo)
}
+ private def testAccumValue(name: Option[String], value: Any, expectedJson: JValue): Unit = {
+ val json = JsonProtocol.accumValueToJson(name, value)
+ assert(json === expectedJson)
+ val newValue = JsonProtocol.accumValueFromJson(name, json)
+ val expectedValue = if (name.exists(_.startsWith(METRICS_PREFIX))) value else value.toString
+ assert(newValue === expectedValue)
+ }
+
/** -------------------------------- *
| Util methods for comparing events |
- * --------------------------------- */
+ * --------------------------------- */
- private def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
+ private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
(event1, event2) match {
case (e1: SparkListenerStageSubmitted, e2: SparkListenerStageSubmitted) =>
assert(e1.properties === e2.properties)
@@ -478,14 +541,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(e1.executorId === e1.executorId)
case (e1: SparkListenerExecutorMetricsUpdate, e2: SparkListenerExecutorMetricsUpdate) =>
assert(e1.execId === e2.execId)
- assertSeqEquals[(Long, Int, Int, TaskMetrics)](e1.taskMetrics, e2.taskMetrics, (a, b) => {
- val (taskId1, stageId1, stageAttemptId1, metrics1) = a
- val (taskId2, stageId2, stageAttemptId2, metrics2) = b
- assert(taskId1 === taskId2)
- assert(stageId1 === stageId2)
- assert(stageAttemptId1 === stageAttemptId2)
- assertEquals(metrics1, metrics2)
- })
+ assertSeqEquals[(Long, Int, Int, Seq[AccumulableInfo])](
+ e1.accumUpdates,
+ e2.accumUpdates,
+ (a, b) => {
+ val (taskId1, stageId1, stageAttemptId1, updates1) = a
+ val (taskId2, stageId2, stageAttemptId2, updates2) = b
+ assert(taskId1 === taskId2)
+ assert(stageId1 === stageId2)
+ assert(stageAttemptId1 === stageAttemptId2)
+ assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b))
+ })
case (e1, e2) =>
assert(e1 === e2)
case _ => fail("Events don't match in types!")
@@ -544,7 +610,6 @@ class JsonProtocolSuite extends SparkFunSuite {
}
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
- assert(metrics1.hostname === metrics2.hostname)
assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime)
assert(metrics1.resultSize === metrics2.resultSize)
assert(metrics1.jvmGCTime === metrics2.jvmGCTime)
@@ -601,7 +666,7 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(r1.description === r2.description)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assert(r1.fullStackTrace === r2.fullStackTrace)
- assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
+ assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b))
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
@@ -637,10 +702,16 @@ class JsonProtocolSuite extends SparkFunSuite {
assertStackTraceElementEquals)
}
- private def assertJsonStringEquals(json1: String, json2: String) {
+ private def assertJsonStringEquals(expected: String, actual: String, metadata: String) {
val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
- assert(formatJsonString(json1) === formatJsonString(json2),
- s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}")
+ if (formatJsonString(expected) != formatJsonString(actual)) {
+ // scalastyle:off
+ // This prints something useful if the JSON strings don't match
+ println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n")
+ println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n")
+ // scalastyle:on
+ throw new TestFailedException(s"$metadata JSON did not equal", 1)
+ }
}
private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) {
@@ -699,7 +770,7 @@ class JsonProtocolSuite extends SparkFunSuite {
/** ----------------------------------- *
| Util methods for constructing events |
- * ------------------------------------ */
+ * ------------------------------------ */
private val properties = {
val p = new Properties
@@ -746,8 +817,12 @@ class JsonProtocolSuite extends SparkFunSuite {
taskInfo
}
- private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo =
- AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal)
+ private def makeAccumulableInfo(
+ id: Int,
+ internal: Boolean = false,
+ countFailedValues: Boolean = false): AccumulableInfo =
+ new AccumulableInfo(id, Some(s"Accumulable$id"), Some(s"delta$id"), Some(s"val$id"),
+ internal, countFailedValues)
/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
@@ -764,7 +839,6 @@ class JsonProtocolSuite extends SparkFunSuite {
hasOutput: Boolean,
hasRecords: Boolean = true) = {
val t = new TaskMetrics
- t.setHostname("localhost")
t.setExecutorDeserializeTime(a)
t.setExecutorRunTime(b)
t.setResultSize(c)
@@ -774,7 +848,7 @@ class JsonProtocolSuite extends SparkFunSuite {
if (hasHadoopInput) {
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
- inputMetrics.incBytesRead(d + e + f)
+ inputMetrics.setBytesRead(d + e + f)
inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
} else {
val sr = t.registerTempShuffleReadMetrics()
@@ -794,7 +868,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val sw = t.registerShuffleWriteMetrics()
sw.incBytesWritten(a + b + c)
sw.incWriteTime(b + c + d)
- sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
+ sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
}
// Make at most 6 blocks
t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i =>
@@ -826,14 +900,16 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -881,14 +957,16 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -919,21 +997,24 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -962,21 +1043,24 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -1011,26 +1095,28 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| },
| "Task Metrics": {
- | "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Result Size": 500,
@@ -1044,7 +1130,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Fetch Wait Time": 900,
| "Remote Bytes Read": 1000,
| "Local Bytes Read": 1100,
- | "Total Records Read" : 10
+ | "Total Records Read": 10
| },
| "Shuffle Write Metrics": {
| "Shuffle Bytes Written": 1200,
@@ -1098,26 +1184,28 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| },
| "Task Metrics": {
- | "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Result Size": 500,
@@ -1182,26 +1270,28 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3",
- | "Internal": true
+ | "Internal": true,
+ | "Count Failed Values": false
| }
| ]
| },
| "Task Metrics": {
- | "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Result Size": 500,
@@ -1273,17 +1363,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -1331,17 +1423,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -1405,17 +1499,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| },
@@ -1495,17 +1591,19 @@ class JsonProtocolSuite extends SparkFunSuite {
| "Accumulables": [
| {
| "ID": 2,
- | "Name": " Accumulable 2",
+ | "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| },
| {
| "ID": 1,
- | "Name": " Accumulable 1",
+ | "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1",
- | "Internal": false
+ | "Internal": false,
+ | "Count Failed Values": false
| }
| ]
| }
@@ -1657,51 +1755,208 @@ class JsonProtocolSuite extends SparkFunSuite {
"""
private val executorMetricsUpdateJsonString =
- s"""
- |{
- | "Event": "SparkListenerExecutorMetricsUpdate",
- | "Executor ID": "exec3",
- | "Metrics Updated": [
- | {
- | "Task ID": 1,
- | "Stage ID": 2,
- | "Stage Attempt ID": 3,
- | "Task Metrics": {
- | "Host Name": "localhost",
- | "Executor Deserialize Time": 300,
- | "Executor Run Time": 400,
- | "Result Size": 500,
- | "JVM GC Time": 600,
- | "Result Serialization Time": 700,
- | "Memory Bytes Spilled": 800,
- | "Disk Bytes Spilled": 0,
- | "Input Metrics": {
- | "Data Read Method": "Hadoop",
- | "Bytes Read": 2100,
- | "Records Read": 21
- | },
- | "Output Metrics": {
- | "Data Write Method": "Hadoop",
- | "Bytes Written": 1200,
- | "Records Written": 12
- | },
- | "Updated Blocks": [
- | {
- | "Block ID": "rdd_0_0",
- | "Status": {
- | "Storage Level": {
- | "Use Disk": true,
- | "Use Memory": true,
- | "Deserialized": false,
- | "Replication": 2
- | },
- | "Memory Size": 0,
- | "Disk Size": 0
- | }
- | }
- | ]
- | }
- | }]
- |}
- """.stripMargin
+ s"""
+ |{
+ | "Event": "SparkListenerExecutorMetricsUpdate",
+ | "Executor ID": "exec3",
+ | "Metrics Updated": [
+ | {
+ | "Task ID": 1,
+ | "Stage ID": 2,
+ | "Stage Attempt ID": 3,
+ | "Accumulator Updates": [
+ | {
+ | "ID": 0,
+ | "Name": "$EXECUTOR_DESERIALIZE_TIME",
+ | "Update": 300,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 1,
+ | "Name": "$EXECUTOR_RUN_TIME",
+ | "Update": 400,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "$RESULT_SIZE",
+ | "Update": 500,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "$JVM_GC_TIME",
+ | "Update": 600,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 4,
+ | "Name": "$RESULT_SERIALIZATION_TIME",
+ | "Update": 700,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 5,
+ | "Name": "$MEMORY_BYTES_SPILLED",
+ | "Update": 800,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 6,
+ | "Name": "$DISK_BYTES_SPILLED",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 7,
+ | "Name": "$PEAK_EXECUTION_MEMORY",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 8,
+ | "Name": "$UPDATED_BLOCK_STATUSES",
+ | "Update": [
+ | {
+ | "BlockID": "rdd_0_0",
+ | "Status": {
+ | "StorageLevel": {
+ | "UseDisk": true,
+ | "UseMemory": true,
+ | "Deserialized": false,
+ | "Replication": 2
+ | },
+ | "MemorySize": 0,
+ | "DiskSize": 0
+ | }
+ | }
+ | ],
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 9,
+ | "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 10,
+ | "Name": "${shuffleRead.LOCAL_BLOCKS_FETCHED}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 11,
+ | "Name": "${shuffleRead.REMOTE_BYTES_READ}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 12,
+ | "Name": "${shuffleRead.LOCAL_BYTES_READ}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 13,
+ | "Name": "${shuffleRead.FETCH_WAIT_TIME}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 14,
+ | "Name": "${shuffleRead.RECORDS_READ}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 15,
+ | "Name": "${shuffleWrite.BYTES_WRITTEN}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 16,
+ | "Name": "${shuffleWrite.RECORDS_WRITTEN}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 17,
+ | "Name": "${shuffleWrite.WRITE_TIME}",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 18,
+ | "Name": "${input.READ_METHOD}",
+ | "Update": "Hadoop",
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 19,
+ | "Name": "${input.BYTES_READ}",
+ | "Update": 2100,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 20,
+ | "Name": "${input.RECORDS_READ}",
+ | "Update": 21,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 21,
+ | "Name": "${output.WRITE_METHOD}",
+ | "Update": "Hadoop",
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 22,
+ | "Name": "${output.BYTES_WRITTEN}",
+ | "Update": 1200,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 23,
+ | "Name": "${output.RECORDS_WRITTEN}",
+ | "Update": 12,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | },
+ | {
+ | "ID": 24,
+ | "Name": "$TEST_ACCUM",
+ | "Update": 0,
+ | "Internal": true,
+ | "Count Failed Values": true
+ | }
+ | ]
+ | }
+ | ]
+ |}
+ """.stripMargin
}