aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-24 17:27:20 +0000
committerSean Owen <sowen@cloudera.com>2016-03-24 17:27:20 +0000
commit342079dc45425309798d6082cccef86858f08a77 (patch)
tree07463b657cf83cf714b59076f4ef5e18d6a589be
parent5519760e0fe7d52170b38a52ce3d670d158e2aba (diff)
downloadspark-342079dc45425309798d6082cccef86858f08a77.tar.gz
spark-342079dc45425309798d6082cccef86858f08a77.tar.bz2
spark-342079dc45425309798d6082cccef86858f08a77.zip
Revert "[SPARK-2208] Fix for local metrics tests can fail on fast machines". The test appears to still be flaky after this change, or more flaky.
This reverts commit 5519760e0fe7d52170b38a52ce3d670d158e2aba.
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala40
1 files changed, 15 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index f1f9b69c89..58d217ffef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,10 +26,6 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.shuffle.IndexShuffleBlockResolver
-import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
@@ -219,24 +215,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("local metrics") {
- val conf = new SparkConf()
- .setMaster("local").setAppName("SparkListenerSuite")
- .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName)
- sc = new SparkContext(conf)
+ sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
+ // just to make sure some of the tasks take a noticeable amount of time
+ val w = { i: Int =>
+ if (i == 0) {
+ Thread.sleep(100)
+ }
+ i
+ }
val numSlices = 16
- val d = sc.parallelize(0 to 1e3.toInt, numSlices)
+ val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (1)
- val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1")
- val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2")
+ val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
+ val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
- k -> (v1.size, v2.size)
+ w(k) -> (v1.size, v2.size)
}
d4.setName("A Cogroup")
d4.collectAsMap()
@@ -255,11 +255,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskInfoMetrics.map(_._2.executorDeserializeTime),
stageInfo + " executorDeserializeTime")
+ /* Test is disabled (SEE SPARK-2208)
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
checkNonZeroAvg(
taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
stageInfo + " fetchWaitTime")
}
+ */
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0L)
@@ -335,7 +337,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.wait(remainingWait)
remainingWait = finishTime - System.currentTimeMillis
}
- assert(listener.startedTasks.nonEmpty)
+ assert(!listener.startedTasks.isEmpty)
}
f.cancel()
@@ -474,15 +476,3 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}
-
-/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */
-private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) {
-
- override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) {
-
- override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
- Thread.sleep(10)
- super.getBlockData(blockId)
- }
- }
-}