diff options
author | Kan Zhang <kzhang@apache.org> | 2014-04-09 15:24:33 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-09 15:25:29 -0700 |
commit | eb5f2b64230faa69a53815cb61bcc87aeb233d20 (patch) | |
tree | c9b18d1b09d10a2d893c421e9b4ee79afb768b55 /core | |
parent | bde9cc11fee42a0a41ec52d5dc7fa0502ce94f77 (diff) | |
download | spark-eb5f2b64230faa69a53815cb61bcc87aeb233d20.tar.gz spark-eb5f2b64230faa69a53815cb61bcc87aeb233d20.tar.bz2 spark-eb5f2b64230faa69a53815cb61bcc87aeb233d20.zip |
SPARK-1407 drain event queue before stopping event logger
Author: Kan Zhang <kzhang@apache.org>
Closes #366 from kanzhang/SPARK-1407 and squashes the following commits:
cd0629f [Kan Zhang] code refactoring and adding test
b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger
Diffstat (limited to 'core')
3 files changed, 66 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f7750514ae..76305237b0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { ui.stop() - eventLogger.foreach(_.stop()) // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler @@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging { metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() - listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() + listenerBus.stop() + eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") } else { logInfo("SparkContext already stopped") diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 353a48661b..76f3e327d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
+ private val listenerThread = new Thread("SparkListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ if (event == SparkListenerShutdown) {
+ // Get out of the while loop and shutdown the daemon thread
+ return
+ }
+ postToAll(event)
+ }
+ }
+ }
+
+ // Exposed for testing
+ @volatile private[spark] var stopCalled = false
/**
* Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (started) {
throw new IllegalStateException("Listener bus already started!")
}
+ listenerThread.start()
started = true
- new Thread("SparkListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- if (event == SparkListenerShutdown) {
- // Get out of the while loop and shutdown the daemon thread
- return
- }
- postToAll(event)
- }
- }
- }.start()
}
def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { }
def stop() {
+ stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
+ listenerThread.join()
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 7c843772bc..dc704e07a8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.Semaphore + import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} @@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } + test("bus.stop() waits for the event queue to completely drain") { + @volatile var drained = false + + // Tells the listener to stop blocking + val listenerWait = new Semaphore(1) + + // When stop has returned + val stopReturned = new Semaphore(1) + + class BlockingListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd) = { + listenerWait.acquire() + drained = true + } + } + + val bus = new LiveListenerBus + val blockingListener = new BlockingListener + + bus.addListener(blockingListener) + bus.start() + bus.post(SparkListenerJobEnd(0, JobSucceeded)) + + // the queue should not drain immediately + assert(!drained) + + new Thread("ListenerBusStopper") { + override def run() { + // stop() will block until notify() is called below + bus.stop() + stopReturned.release(1) + } + }.start() + + while (!bus.stopCalled) { + Thread.sleep(10) + } + + listenerWait.release() + stopReturned.acquire() + assert(drained) + } + test("basic creation of StageInfo") { val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) |