aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-09-26 10:44:35 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-26 10:44:35 -0700
commitbde85f8b70138a51052b613664facbc981378c38 (patch)
tree5b6aa930534722500dfc6ac98d7aa5f01c403b61 /core/src/main/scala
parentf234b7cd795dd9baa3feff541c211b4daf39ccc6 (diff)
downloadspark-bde85f8b70138a51052b613664facbc981378c38.tar.gz
spark-bde85f8b70138a51052b613664facbc981378c38.tar.bz2
spark-bde85f8b70138a51052b613664facbc981378c38.zip
[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus
## What changes were proposed in this pull request? Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15220 from zsxwing/SPARK-17649.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala26
1 files changed, 25 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index bfa3c408f2..5533f7b1f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.util.DynamicVariable
@@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
+ /** A counter for dropped events. It will be reset every time we log it. */
+ private val droppedEventsCounter = new AtomicLong(0L)
+
+ /** When `droppedEventsCounter` was logged last time in milliseconds. */
+ @volatile private var lastReportTimestamp = 0L
+
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
@@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
eventLock.release()
} else {
onDropEvent(event)
+ droppedEventsCounter.incrementAndGet()
+ }
+
+ val droppedEvents = droppedEventsCounter.get
+ if (droppedEvents > 0) {
+ // Don't log too frequently
+ if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
+ // There may be multiple threads trying to decrease droppedEventsCounter.
+ // Use "compareAndSet" to make sure only one thread can win.
+ // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
+ // then that thread will update it.
+ if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
+ val prevLastReportTimestamp = lastReportTimestamp
+ lastReportTimestamp = System.currentTimeMillis()
+ logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
+ new java.util.Date(prevLastReportTimestamp))
+ }
+ }
}
}