aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-04-24 20:18:15 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-24 20:18:15 -0700
commitee6f7e22a449837864072e3cd2b6696005f134f1 (patch)
tree84e723f5443a5056bb691e29393962570ada7dfa
parent80429f3e2ab786d103297652922c3d8da3cf5a01 (diff)
downloadspark-ee6f7e22a449837864072e3cd2b6696005f134f1.tar.gz
spark-ee6f7e22a449837864072e3cd2b6696005f134f1.tar.bz2
spark-ee6f7e22a449837864072e3cd2b6696005f134f1.zip
[SPARK-1615] Synchronize accesses to the LiveListenerBus' event queue
Original poster is @zsxwing, who reported this bug in #516. Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen: (1) We dequeue an event (2) The queue is empty, we return true (even though the event has not been processed) (3) The test asserts something assuming that all listeners have finished executing (and fails) (4) The listeners receive and process the event This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics. This has been a possible race condition for a long time, but for some reason we've never run into it. Author: Andrew Or <andrewor14@gmail.com> Closes #544 from andrewor14/stage-info-test-fix and squashes the following commits: 3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix 56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala5
2 files changed, 31 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index cbac4c13ca..dec3316bf7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
import org.apache.spark.Logging
@@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
+
+ // A counter that represents the number of events produced and consumed in the queue
+ private val eventLock = new Semaphore(0)
+
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
- val event = eventQueue.take
- if (event == SparkListenerShutdown) {
- // Get out of the while loop and shutdown the daemon thread
- return
+ eventLock.acquire()
+ // Atomically remove and process this event
+ LiveListenerBus.this.synchronized {
+ val event = eventQueue.poll
+ if (event == SparkListenerShutdown) {
+ // Get out of the while loop and shutdown the daemon thread
+ return
+ }
+ Option(event).foreach(postToAll)
}
- postToAll(event)
}
}
}
@@ -67,7 +75,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
+ if (eventAdded) {
+ eventLock.release()
+ } else if (!queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
@@ -76,13 +86,13 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
/**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * For testing only. Wait until there are no more events in the queue, or until the specified
+ * time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
+ while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
@@ -93,6 +103,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
true
}
+ /**
+ * Return whether the event queue is empty.
+ *
+ * The use of synchronized here guarantees that all events that once belonged to this queue
+ * have already been processed by all attached listeners, if this returns true.
+ */
+ def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
+
def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index ba048ced32..4e9fd07e68 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.SparkContext._
import org.apache.spark.executor.TaskMetrics
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
- with BeforeAndAfter with BeforeAndAfterAll {
+ with BeforeAndAfter with BeforeAndAfterAll {
+
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
@@ -37,7 +38,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sc = new SparkContext("local", "SparkListenerSuite")
}
- override def afterAll {
+ override def afterAll() {
System.clearProperty("spark.akka.frameSize")
}