aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDhruve Ashar <dhruveashar@gmail.com>2016-07-26 13:23:33 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-07-26 13:23:33 -0500
commit0b71d9ae0804b0394e4abd02c7cebf52a9102216 (patch)
tree81f2506d2c96e7756e4eda9a288b58dcbdd7ca21
parent0869b3a5f028b64c2da511e70b02ab42f65fc949 (diff)
downloadspark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.tar.gz
spark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.tar.bz2
spark-0b71d9ae0804b0394e4abd02c7cebf52a9102216.zip
[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable
## What changes were proposed in this pull request? This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000. Note: I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details. ## How was this patch tested? Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #14269 from dhruve/bug/SPARK-15703.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala5
9 files changed, 60 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6d7f05d217..d48e2b420d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def isStopped: Boolean = stopped.get()
// An asynchronous listener bus for Spark events
- private[spark] val listenerBus = new LiveListenerBus
+ private[spark] val listenerBus = new LiveListenerBus(this)
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
@@ -2148,7 +2148,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
- listenerBus.start(this)
+ listenerBus.start()
_listenerBusStarted = true
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 05dd68300f..ebb21e9efd 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -103,4 +103,9 @@ package object config {
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
+
+ private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
+ ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
+ .intConf
+ .createWithDefault(10000)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 1c21313d1c..bfa3c408f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.util.DynamicVariable
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
/**
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
-private[spark] class LiveListenerBus extends SparkListenerBus {
+private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
self =>
import LiveListenerBus._
- private var sparkContext: SparkContext = null
-
// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+ private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
+ private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+
+ private def validateAndGetQueueSize(): Int = {
+ val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
+ if (queueSize <= 0) {
+ throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
+ }
+ queueSize
+ }
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
@@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
- * @param sc Used to stop the SparkContext in case the listener thread dies.
*/
- def start(sc: SparkContext): Unit = {
+ def start(): Unit = {
if (started.compareAndSet(false, true)) {
- sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c4c80b5b57..7f4859206e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
- val listenerBus = new LiveListenerBus
+ val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
- listenerBus.start(sc)
+ listenerBus.start()
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 5ba67afc0c..e8a88d4909 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val jobCompletionTime = 1421191296660L
test("don't call sc.stop in listener") {
- sc = new SparkContext("local", "SparkListenerSuite")
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.addListener(listener)
// Starting listener bus should flush all buffered events
- bus.start(sc)
+ bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
@@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("basic creation and shutdown of LiveListenerBus") {
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val counter = new BasicJobCounter
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.addListener(counter)
// Listener bus hasn't started yet, so posting events should not increment counter
@@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
- bus.start(sc)
+ bus.start()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
@@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Listener bus must not be started twice
intercept[IllegalStateException] {
- val bus = new LiveListenerBus
- bus.start(sc)
- bus.start(sc)
+ val bus = new LiveListenerBus(sc)
+ bus.start()
+ bus.start()
}
// ... or stopped before starting
intercept[IllegalStateException] {
- val bus = new LiveListenerBus
+ val bus = new LiveListenerBus(sc)
bus.stop()
}
}
@@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
drained = true
}
}
-
- val bus = new LiveListenerBus
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+ val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener
bus.addListener(blockingListener)
- bus.start(sc)
+ bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
@@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
- val bus = new LiveListenerBus
+ sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+ val bus = new LiveListenerBus(sc)
// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
- bus.start(sc)
+ bus.start()
// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 31687e6147..b9e3a364ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class BlockManagerReplicationSuite extends SparkFunSuite
+ with Matchers
+ with BeforeAndAfter
+ with LocalSparkContext {
private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
@@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
+ sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
allStores.clear()
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 8077a1b941..87c8628ce9 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
- with PrivateMethodTester with ResetSystemProperties {
+ with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
import BlockManagerSuite._
@@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ sc = new SparkContext("local", "test", conf)
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 411a0ddebe..f6c8418ba3 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -19,15 +19,14 @@ package org.apache.spark.ui.storage
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkFunSuite, Success}
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
/**
* Test various functionality in the StorageListener that supports the StorageTab.
*/
-class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
+class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
private var bus: LiveListenerBus = _
private var storageStatusListener: StorageStatusListener = _
private var storageListener: StorageListener = _
@@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private val bm1 = BlockManagerId("big", "dog", 1)
before {
- bus = new LiveListenerBus
- storageStatusListener = new StorageStatusListener(new SparkConf())
+ val conf = new SparkConf()
+ sc = new SparkContext("local", "test", conf)
+ bus = new LiveListenerBus(sc)
+ storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index e97427991b..feb5c30c6a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite
extends SparkFunSuite
with BeforeAndAfter
with Matchers
+ with LocalSparkContext
with Logging {
import WriteAheadLogBasedBlockHandler._
@@ -77,8 +78,10 @@ class ReceivedBlockHandlerSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
+ sc = new SparkContext("local", "test", conf)
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
- new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+ new LiveListenerBus(sc))), conf, true)
storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)