diff options
author | Sean Owen <sowen@cloudera.com> | 2016-03-24 17:27:20 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-24 17:27:20 +0000 |
commit | 342079dc45425309798d6082cccef86858f08a77 (patch) | |
tree | 07463b657cf83cf714b59076f4ef5e18d6a589be /core | |
parent | 5519760e0fe7d52170b38a52ce3d670d158e2aba (diff) | |
download | spark-342079dc45425309798d6082cccef86858f08a77.tar.gz spark-342079dc45425309798d6082cccef86858f08a77.tar.bz2 spark-342079dc45425309798d6082cccef86858f08a77.zip |
Revert "[SPARK-2208] Fix for local metrics tests can fail on fast machines". The test appears to still be flaky after this change, or more flaky.
This reverts commit 5519760e0fe7d52170b38a52ce3d670d158e2aba.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 40 |
1 files changed, 15 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index f1f9b69c89..58d217ffef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -26,10 +26,6 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.network.buffer.ManagedBuffer -import org.apache.spark.shuffle.IndexShuffleBlockResolver -import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers @@ -219,24 +215,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("local metrics") { - val conf = new SparkConf() - .setMaster("local").setAppName("SparkListenerSuite") - .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName) - sc = new SparkContext(conf) + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) + // just to make sure some of the tasks take a noticeable amount of time + val w = { i: Int => + if (i == 0) { + Thread.sleep(100) + } + i + } val numSlices = 16 - val d = sc.parallelize(0 to 1e3.toInt, numSlices) + val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w) d.count() sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be (1) - val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1") - val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2") + val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1") + val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2") val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) => - k -> (v1.size, v2.size) + w(k) -> (v1.size, v2.size) } d4.setName("A Cogroup") d4.collectAsMap() @@ -255,11 +255,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskInfoMetrics.map(_._2.executorDeserializeTime), stageInfo + " executorDeserializeTime") + /* Test is disabled (SEE SPARK-2208) if (stageInfo.rddInfos.exists(_.name == d4.name)) { checkNonZeroAvg( taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime), stageInfo + " fetchWaitTime") } + */ taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0L) @@ -335,7 +337,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match listener.wait(remainingWait) remainingWait = finishTime - System.currentTimeMillis } - assert(listener.startedTasks.nonEmpty) + assert(!listener.startedTasks.isEmpty) } f.cancel() @@ -474,15 +476,3 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene var count = 0 override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 } - -/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */ -private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) { - - override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) { - - override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { - Thread.sleep(10) - super.getBlockData(blockId) - } - } -} |