aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDhruve Ashar <dhruveashar@gmail.com>2016-07-26 13:23:33 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-07-26 13:23:33 -0500
commit0b71d9ae0804b0394e4abd02c7cebf52a9102216 (patch)
tree81f2506d2c96e7756e4eda9a288b58dcbdd7ca21 /streaming/src
parent0869b3a5f028b64c2da511e70b02ab42f65fc949 (diff)
downloadspark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.tar.gz
spark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.tar.bz2
spark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.zip
[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable
## What changes were proposed in this pull request? This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000. Note: I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details. ## How was this patch tested? Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #14269 from dhruve/bug/SPARK-15703.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala5
1 files changed, 4 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index e97427991b..feb5c30c6a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite
extends SparkFunSuite
with BeforeAndAfter
with Matchers
+ with LocalSparkContext
with Logging {
import WriteAheadLogBasedBlockHandler._
@@ -77,8 +78,10 @@ class ReceivedBlockHandlerSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ sc = new SparkContext("local", "test", conf)
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)