diff options
author | Michael Armbrust <michael@databricks.com> | 2016-04-07 18:05:54 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-07 18:05:54 -0700 |
commit | 692c74840bc53debbb842db5372702f58207412c (patch) | |
tree | a586d61e5e438dc3371823bba3b66cea6c53f424 /core/src/main/scala/org/apache | |
parent | 3e29e372ff518827bae9dcd26087946fde476843 (diff) | |
download | spark-692c74840bc53debbb842db5372702f58207412c.tar.gz spark-692c74840bc53debbb842db5372702f58207412c.tar.bz2 spark-692c74840bc53debbb842db5372702f58207412c.zip |
[SPARK-14449][SQL] SparkContext should use SparkListenerInterface
Currently all `SparkFirehoseListener` implementations are broken since we expect listeners to extend `SparkListener`, while the fire hose only extends `SparkListenerInterface`. This changes the addListener function and the config based injection to use the interface instead.
The existing tests in SparkListenerSuite are improved such that they would have caught this.
Follow-up to #12142
Author: Michael Armbrust <michael@databricks.com>
Closes #12227 from marmbrus/fixListener.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala | 7 |
2 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c40fada64b..9ec5cedf25 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register a listener to receive up-calls from events that happen during execution. */ @DeveloperApi - def addSparkListener(listener: SparkListener) { + def addSparkListener(listener: SparkListenerInterface) { listenerBus.addListener(listener) } @@ -2007,7 +2007,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Use reflection to find the right constructor val constructors = { val listenerClass = Utils.classForName(className) - listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] + listenerClass + .getConstructors + .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]] } val constructorTakingSparkConf = constructors.find { c => c.getParameterTypes.sameElements(Array(classOf[SparkConf])) @@ -2015,7 +2017,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli lazy val zeroArgumentConstructor = constructors.find { c => c.getParameterTypes.isEmpty } - val listener: SparkListener = { + val listener: SparkListenerInterface = { if (constructorTakingSparkConf.isDefined) { constructorTakingSparkConf.get.newInstance(conf) } else if (zeroArgumentConstructor.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 94f0574f0e..471586ac08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -22,9 +22,12 @@ import org.apache.spark.util.ListenerBus /** * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners */ -private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] { +private[spark] trait SparkListenerBus + extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { - protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = { + protected override def doPostEvent( + listener: SparkListenerInterface, + event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) |