aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-04-09 15:24:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-09 15:25:29 -0700
commiteb5f2b64230faa69a53815cb61bcc87aeb233d20 (patch)
treec9b18d1b09d10a2d893c421e9b4ee79afb768b55 /core
parentbde9cc11fee42a0a41ec52d5dc7fa0502ce94f77 (diff)
downloadspark-eb5f2b64230faa69a53815cb61bcc87aeb233d20.tar.gz
spark-eb5f2b64230faa69a53815cb61bcc87aeb233d20.tar.bz2
spark-eb5f2b64230faa69a53815cb61bcc87aeb233d20.zip
SPARK-1407 drain event queue before stopping event logger
Author: Kan Zhang <kzhang@apache.org> Closes #366 from kanzhang/SPARK-1407 and squashes the following commits: cd0629f [Kan Zhang] code refactoring and adding test b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala45
3 files changed, 66 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f7750514ae..76305237b0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
ui.stop()
- eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
@@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
- listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
+ listenerBus.stop()
+ eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 353a48661b..76f3e327d6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
+ private val listenerThread = new Thread("SparkListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ if (event == SparkListenerShutdown) {
+ // Get out of the while loop and shutdown the daemon thread
+ return
+ }
+ postToAll(event)
+ }
+ }
+ }
+
+ // Exposed for testing
+ @volatile private[spark] var stopCalled = false
/**
* Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
if (started) {
throw new IllegalStateException("Listener bus already started!")
}
+ listenerThread.start()
started = true
- new Thread("SparkListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- if (event == SparkListenerShutdown) {
- // Get out of the while loop and shutdown the daemon thread
- return
- }
- postToAll(event)
- }
- }
- }.start()
}
def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
def stop() {
+ stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
+ listenerThread.join()
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 7c843772bc..dc704e07a8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import java.util.concurrent.Semaphore
+
import scala.collection.mutable
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
@@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
}
+ test("bus.stop() waits for the event queue to completely drain") {
+ @volatile var drained = false
+
+ // Tells the listener to stop blocking
+ val listenerWait = new Semaphore(1)
+
+ // When stop has returned
+ val stopReturned = new Semaphore(1)
+
+ class BlockingListener extends SparkListener {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+ listenerWait.acquire()
+ drained = true
+ }
+ }
+
+ val bus = new LiveListenerBus
+ val blockingListener = new BlockingListener
+
+ bus.addListener(blockingListener)
+ bus.start()
+ bus.post(SparkListenerJobEnd(0, JobSucceeded))
+
+ // the queue should not drain immediately
+ assert(!drained)
+
+ new Thread("ListenerBusStopper") {
+ override def run() {
+ // stop() will block until notify() is called below
+ bus.stop()
+ stopReturned.release(1)
+ }
+ }.start()
+
+ while (!bus.stopCalled) {
+ Thread.sleep(10)
+ }
+
+ listenerWait.release()
+ stopReturned.acquire()
+ assert(drained)
+ }
+
test("basic creation of StageInfo") {
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)