aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
index 43864c711f..e5fade26b0 100644
--- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
@@ -31,6 +31,7 @@ class SparkListenerEventProcessor() extends Logging {
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
new Thread("SparkListenerEventProcessor") {
setDaemon(true)
@@ -62,10 +63,11 @@ class SparkListenerEventProcessor() extends Logging {
def addEvent(event: SparkListenerEvents) {
val eventAdded = eventQueue.offer(event)
- if (!eventAdded) {
+ if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
}
}
}