From 5519760e0fe7d52170b38a52ce3d670d158e2aba Mon Sep 17 00:00:00 2001 From: Joan Date: Thu, 24 Mar 2016 09:47:44 +0000 Subject: [SPARK-2208] Fix for local metrics tests can fail on fast machines ## What changes were proposed in this pull request? A fix for local metrics tests that can fail on fast machines. This is probably what is suggested here #3380 by aarondav? ## How was this patch tested? CI Tests Cheers Author: Joan Closes #11747 from joan38/SPARK-2208-Local-metrics-tests. --- .../spark/scheduler/SparkListenerSuite.scala | 40 ++++++++++++++-------- 1 file changed, 25 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 58d217ffef..f1f9b69c89 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -26,6 +26,10 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers @@ -215,28 +219,24 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("local metrics") { - sc = new SparkContext("local", "SparkListenerSuite") + val conf = new SparkConf() + .setMaster("local").setAppName("SparkListenerSuite") + .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName) + sc = new SparkContext(conf) val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) - // just to make sure some of the tasks take a noticeable amount of time - val w = { i: Int => - if (i == 0) { - Thread.sleep(100) - } - i - } val numSlices = 16 - val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w) + val d = sc.parallelize(0 to 1e3.toInt, numSlices) d.count() sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be (1) - val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1") - val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2") + val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1") + val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2") val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) => - w(k) -> (v1.size, v2.size) + k -> (v1.size, v2.size) } d4.setName("A Cogroup") d4.collectAsMap() @@ -255,13 +255,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskInfoMetrics.map(_._2.executorDeserializeTime), stageInfo + " executorDeserializeTime") - /* Test is disabled (SEE SPARK-2208) if (stageInfo.rddInfos.exists(_.name == d4.name)) { checkNonZeroAvg( taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime), stageInfo + " fetchWaitTime") } - */ taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0L) @@ -337,7 +335,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match listener.wait(remainingWait) remainingWait = finishTime - System.currentTimeMillis } - assert(!listener.startedTasks.isEmpty) + assert(listener.startedTasks.nonEmpty) } f.cancel() @@ -476,3 +474,15 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene var count = 0 override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 } + +/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */ +private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) { + + override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) { + + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + Thread.sleep(10) + super.getBlockData(blockId) + } + } +} -- cgit v1.2.3