diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-09-26 10:44:35 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-09-26 10:44:35 -0700 |
commit | bde85f8b70138a51052b613664facbc981378c38 (patch) | |
tree | 5b6aa930534722500dfc6ac98d7aa5f01c403b61 /core/src/main/scala | |
parent | f234b7cd795dd9baa3feff541c211b4daf39ccc6 (diff) | |
download | spark-bde85f8b70138a51052b613664facbc981378c38.tar.gz spark-bde85f8b70138a51052b613664facbc981378c38.tar.bz2 spark-bde85f8b70138a51052b613664facbc981378c38.zip |
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus
## What changes were proposed in this pull request?
Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #15220 from zsxwing/SPARK-17649.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index bfa3c408f2..5533f7b1f2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.DynamicVariable @@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa // Indicate if `stop()` is called private val stopped = new AtomicBoolean(false) + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + // Indicate if we are processing some event // Guarded by `self` private var processingEvent = false @@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa eventLock.release() } else { onDropEvent(event) + droppedEventsCounter.incrementAndGet() + } + + val droppedEvents = droppedEventsCounter.get + if (droppedEvents > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + + new java.util.Date(prevLastReportTimestamp)) + } + } } } |