From 2180c8718895f1773a9906ea0ddd49171c468e0b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 11 Jan 2014 13:36:37 -0800 Subject: Stop SparkListenerBus daemon thread when DAGScheduler is stopped. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../scala/org/apache/spark/scheduler/SparkListener.scala | 3 +++ .../org/apache/spark/scheduler/SparkListenerBus.scala | 15 +++++++++++---- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 38b536023b..7046c06d20 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -133,7 +133,8 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private[spark] val listenerBus = new SparkListenerBus() + // An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped. + private[spark] val listenerBus = new SparkListenerBus // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] @@ -1121,5 +1122,6 @@ class DAGScheduler( } metadataCleaner.cancel() taskSched.stop() + listenerBus.stop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 627995c826..55a40a92c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) extends SparkListenerEvents +/** An event used in the listener to shutdown the listener daemon thread. */ +private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents + trait SparkListener { /** * Called when a stage is completed, with information on the completed stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e7defd768b..fc637884e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.Logging /** Asynchronously passes SparkListenerEvents to registered SparkListeners. */ -private[spark] class SparkListenerBus() extends Logging { - private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener] +private[spark] class SparkListenerBus extends Logging { + private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener] /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 + private val EVENT_QUEUE_CAPACITY = 10000 private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false + // Create a new daemon thread to listen for events. This thread is stopped when it receives + // a SparkListenerShutdown event, using the stop method. new Thread("SparkListenerBus") { setDaemon(true) override def run() { @@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging { sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => sparkListeners.foreach(_.onTaskEnd(taskEnd)) + case SparkListenerShutdown => + // Get out of the while loop and shutdown the daemon thread + return case _ => } } @@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging { */ def waitUntilEmpty(timeoutMillis: Int): Boolean = { val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty()) { + while (!eventQueue.isEmpty) { if (System.currentTimeMillis > finishTime) { return false } @@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging { } return true } + + def stop(): Unit = post(SparkListenerShutdown) } -- cgit v1.2.3