aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-24 17:27:20 +0000
committerSean Owen <sowen@cloudera.com>2016-03-24 17:27:20 +0000
commit342079dc45425309798d6082cccef86858f08a77 (patch)
tree07463b657cf83cf714b59076f4ef5e18d6a589be /core
parent5519760e0fe7d52170b38a52ce3d670d158e2aba (diff)
downloadspark-342079dc45425309798d6082cccef86858f08a77.tar.gz
spark-342079dc45425309798d6082cccef86858f08a77.tar.bz2
spark-342079dc45425309798d6082cccef86858f08a77.zip
Revert "[SPARK-2208] Fix for local metrics tests can fail on fast machines". The test appears to still be flaky after this change, or more flaky.
This reverts commit 5519760e0fe7d52170b38a52ce3d670d158e2aba.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala40
1 files changed, 15 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index f1f9b69c89..58d217ffef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,10 +26,6 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.shuffle.IndexShuffleBlockResolver
-import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
@@ -219,24 +215,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("local metrics") {
- val conf = new SparkConf()
- .setMaster("local").setAppName("SparkListenerSuite")
- .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName)
- sc = new SparkContext(conf)
+ sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
+ // just to make sure some of the tasks take a noticeable amount of time
+ val w = { i: Int =>
+ if (i == 0) {
+ Thread.sleep(100)
+ }
+ i
+ }
val numSlices = 16
- val d = sc.parallelize(0 to 1e3.toInt, numSlices)
+ val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (1)
- val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1")
- val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2")
+ val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
+ val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
- k -> (v1.size, v2.size)
+ w(k) -> (v1.size, v2.size)
}
d4.setName("A Cogroup")
d4.collectAsMap()
@@ -255,11 +255,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskInfoMetrics.map(_._2.executorDeserializeTime),
stageInfo + " executorDeserializeTime")
+ /* Test is disabled (SEE SPARK-2208)
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
checkNonZeroAvg(
taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
stageInfo + " fetchWaitTime")
}
+ */
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0L)
@@ -335,7 +337,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.wait(remainingWait)
remainingWait = finishTime - System.currentTimeMillis
}
- assert(listener.startedTasks.nonEmpty)
+ assert(!listener.startedTasks.isEmpty)
}
f.cancel()
@@ -474,15 +476,3 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}
-
-/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */
-private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) {
-
- override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) {
-
- override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
- Thread.sleep(10)
- super.getBlockData(blockId)
- }
- }
-}