aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-11 13:36:37 -0800
committerReynold Xin <rxin@apache.org>2014-01-11 13:36:37 -0800
commit2180c8718895f1773a9906ea0ddd49171c468e0b (patch)
treeb873915cddc38e52e50c27ef10789708359ab293
parentee6e7f9b8cc56985787546882fba291cf9ad7667 (diff)
downloadspark-2180c8718895f1773a9906ea0ddd49171c468e0b.tar.gz
spark-2180c8718895f1773a9906ea0ddd49171c468e0b.tar.bz2
spark-2180c8718895f1773a9906ea0ddd49171c468e0b.zip
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala15
3 files changed, 17 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 38b536023b..7046c06d20 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -133,7 +133,8 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
- private[spark] val listenerBus = new SparkListenerBus()
+ // An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
+ private[spark] val listenerBus = new SparkListenerBus
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
@@ -1121,5 +1122,6 @@ class DAGScheduler(
}
metadataCleaner.cancel()
taskSched.stop()
+ listenerBus.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 627995c826..55a40a92c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
+/** An event used in the listener to shutdown the listener daemon thread. */
+private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
+
trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e7defd768b..fc637884e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-private[spark] class SparkListenerBus() extends Logging {
- private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
+private[spark] class SparkListenerBus extends Logging {
+ private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
+ private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
+ // Create a new daemon thread to listen for events. This thread is stopped when it receives
+ // a SparkListenerShutdown event, using the stop method.
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
@@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging {
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
sparkListeners.foreach(_.onTaskEnd(taskEnd))
+ case SparkListenerShutdown =>
+ // Get out of the while loop and shutdown the daemon thread
+ return
case _ =>
}
}
@@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging {
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty()) {
+ while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
@@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging {
}
return true
}
+
+ def stop(): Unit = post(SparkListenerShutdown)
}