aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayo@yahoo-inc.com>2013-08-09 18:20:48 -0700
committerKay Ousterhout <kayo@yahoo-inc.com>2013-08-09 18:20:48 -0700
commit7810a76512a0fe07a81549e7d0cb01463bbefeac (patch)
tree001ed1a30856205437d0034e1901cc3cf338ae31 /core
parent44ca8629d800944daf0f58fb83bcd3ba894455eb (diff)
downloadspark-7810a76512a0fe07a81549e7d0cb01463bbefeac.tar.gz
spark-7810a76512a0fe07a81549e7d0cb01463bbefeac.tar.bz2
spark-7810a76512a0fe07a81549e7d0cb01463bbefeac.zip
Only print event queue full error message once
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
index 43864c711f..e5fade26b0 100644
--- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
@@ -31,6 +31,7 @@ class SparkListenerEventProcessor() extends Logging {
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
new Thread("SparkListenerEventProcessor") {
setDaemon(true)
@@ -62,10 +63,11 @@ class SparkListenerEventProcessor() extends Logging {
def addEvent(event: SparkListenerEvents) {
val eventAdded = eventQueue.offer(event)
- if (!eventAdded) {
+ if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
}
}
}