diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-12 16:55:11 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-12 16:55:11 -0800 |
commit | 82e2b92c6dfebc2f570dbef366696c150548be55 (patch) | |
tree | e605240319817b9267f2c5788e25d5399ae0a06e | |
parent | 288a878999848adb130041d1e40c14bfc879cec6 (diff) | |
parent | 2180c8718895f1773a9906ea0ddd49171c468e0b (diff) | |
download | spark-82e2b92c6dfebc2f570dbef366696c150548be55.tar.gz spark-82e2b92c6dfebc2f570dbef366696c150548be55.tar.bz2 spark-82e2b92c6dfebc2f570dbef366696c150548be55.zip |
Merge pull request #392 from rxin/listenerbus
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.
Otherwise this leads to hundreds of SparkListenerBus daemon threads in our unit tests (and also problematic if user applications launches multiple SparkContext).
3 files changed, 17 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 38b536023b..7046c06d20 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -133,7 +133,8 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private[spark] val listenerBus = new SparkListenerBus() + // An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped. + private[spark] val listenerBus = new SparkListenerBus // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] @@ -1121,5 +1122,6 @@ class DAGScheduler( } metadataCleaner.cancel() taskSched.stop() + listenerBus.stop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 627995c826..55a40a92c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) extends SparkListenerEvents +/** An event used in the listener to shutdown the listener daemon thread. */ +private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents + trait SparkListener { /** * Called when a stage is completed, with information on the completed stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e7defd768b..fc637884e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import org.apache.spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-private[spark] class SparkListenerBus() extends Logging {
- private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
+private[spark] class SparkListenerBus extends Logging {
+ private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
+ private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
+ // Create a new daemon thread to listen for events. This thread is stopped when it receives
+ // a SparkListenerShutdown event, using the stop method.
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
@@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging { sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
sparkListeners.foreach(_.onTaskEnd(taskEnd))
+ case SparkListenerShutdown =>
+ // Get out of the while loop and shutdown the daemon thread
+ return
case _ =>
}
}
@@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging { */
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty()) {
+ while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
@@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging { }
return true
}
+
+ def stop(): Unit = post(SparkListenerShutdown)
}
|