aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala5
2 files changed, 31 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index cbac4c13ca..dec3316bf7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
import org.apache.spark.Logging
@@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
+
+ // A counter that represents the number of events produced and consumed in the queue
+ private val eventLock = new Semaphore(0)
+
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
- val event = eventQueue.take
- if (event == SparkListenerShutdown) {
- // Get out of the while loop and shutdown the daemon thread
- return
+ eventLock.acquire()
+ // Atomically remove and process this event
+ LiveListenerBus.this.synchronized {
+ val event = eventQueue.poll
+ if (event == SparkListenerShutdown) {
+ // Get out of the while loop and shutdown the daemon thread
+ return
+ }
+ Option(event).foreach(postToAll)
}
- postToAll(event)
}
}
}
@@ -67,7 +75,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
+ if (eventAdded) {
+ eventLock.release()
+ } else if (!queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
@@ -76,13 +86,13 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
/**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * For testing only. Wait until there are no more events in the queue, or until the specified
+ * time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
+ while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
@@ -93,6 +103,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
true
}
+ /**
+ * Return whether the event queue is empty.
+ *
+ * The use of synchronized here guarantees that all events that once belonged to this queue
+ * have already been processed by all attached listeners, if this returns true.
+ */
+ def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
+
def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index ba048ced32..4e9fd07e68 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.SparkContext._
import org.apache.spark.executor.TaskMetrics
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
- with BeforeAndAfter with BeforeAndAfterAll {
+ with BeforeAndAfter with BeforeAndAfterAll {
+
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
@@ -37,7 +38,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sc = new SparkContext("local", "SparkListenerSuite")
}
- override def afterAll {
+ override def afterAll() {
System.clearProperty("spark.akka.frameSize")
}