aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorDhruve Ashar <dhruveashar@gmail.com>2016-07-26 13:23:33 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-07-26 13:23:33 -0500
commit0b71d9ae0804b0394e4abd02c7cebf52a9102216 (patch)
tree81f2506d2c96e7756e4eda9a288b58dcbdd7ca21 /core/src/test/scala
parent0869b3a5f028b64c2da511e70b02ab42f65fc949 (diff)
downloadspark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.tar.gz
spark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.tar.bz2
spark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.zip
[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable
## What changes were proposed in this pull request? This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000. Note: I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details. ## How was this patch tested? Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #14269 from dhruve/bug/SPARK-15703.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala11
5 files changed, 35 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c4c80b5b57..7f4859206e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
- val listenerBus = new LiveListenerBus
+ val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
- listenerBus.start(sc)
+ listenerBus.start()
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 5ba67afc0c..e8a88d4909 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val jobCompletionTime = 1421191296660L
test("don't call sc.stop in listener") {
- sc = new SparkContext("local", "SparkListenerSuite")
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.addListener(listener)
// Starting listener bus should flush all buffered events
- bus.start(sc)
+ bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
@@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("basic creation and shutdown of LiveListenerBus") {
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val counter = new BasicJobCounter
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.addListener(counter)
// Listener bus hasn't started yet, so posting events should not increment counter
@@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
- bus.start(sc)
+ bus.start()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
@@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Listener bus must not be started twice
intercept[IllegalStateException] {
- val bus = new LiveListenerBus
- bus.start(sc)
- bus.start(sc)
+ val bus = new LiveListenerBus(sc)
+ bus.start()
+ bus.start()
}
// ... or stopped before starting
intercept[IllegalStateException] {
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.stop()
}
}
@@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
drained = true
}
}
-
- val bus = new LiveListenerBus
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+ val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener
bus.addListener(blockingListener)
- bus.start(sc)
+ bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
@@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
- val bus = new LiveListenerBus
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+ val bus = new LiveListenerBus(sc)
// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
- bus.start(sc)
+ bus.start()
// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 31687e6147..b9e3a364ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class BlockManagerReplicationSuite extends SparkFunSuite
+ with Matchers
+ with BeforeAndAfter
+ with LocalSparkContext {
private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
@@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
+ sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
allStores.clear()
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 8077a1b941..87c8628ce9 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
- with PrivateMethodTester with ResetSystemProperties {
+ with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
import BlockManagerSuite._
@@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 411a0ddebe..f6c8418ba3 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -19,15 +19,14 @@ package org.apache.spark.ui.storage
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkFunSuite, Success}
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
/**
* Test various functionality in the StorageListener that supports the StorageTab.
*/
-class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
+class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
private var bus: LiveListenerBus = _
private var storageStatusListener: StorageStatusListener = _
private var storageListener: StorageListener = _
@@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private val bm1 = BlockManagerId("big", "dog", 1)
before {
- bus = new LiveListenerBus
- storageStatusListener = new StorageStatusListener(new SparkConf())
+ val conf = new SparkConf()
+ sc = new SparkContext("local", "test", conf)
+ bus = new LiveListenerBus(sc)
+ storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)