diff options
author | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-09 18:20:48 -0700 |
---|---|---|
committer | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-09 18:20:48 -0700 |
commit | 7810a76512a0fe07a81549e7d0cb01463bbefeac (patch) | |
tree | 001ed1a30856205437d0034e1901cc3cf338ae31 /core/src | |
parent | 44ca8629d800944daf0f58fb83bcd3ba894455eb (diff) | |
download | spark-7810a76512a0fe07a81549e7d0cb01463bbefeac.tar.gz spark-7810a76512a0fe07a81549e7d0cb01463bbefeac.tar.bz2 spark-7810a76512a0fe07a81549e7d0cb01463bbefeac.zip |
Only print event queue full error message once
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala index 43864c711f..e5fade26b0 100644 --- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala +++ b/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala @@ -31,6 +31,7 @@ class SparkListenerEventProcessor() extends Logging { * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
new Thread("SparkListenerEventProcessor") {
setDaemon(true)
@@ -62,10 +63,11 @@ class SparkListenerEventProcessor() extends Logging { def addEvent(event: SparkListenerEvents) {
val eventAdded = eventQueue.offer(event)
- if (!eventAdded) {
+ if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
}
}
}
|