diff options
author | Michael Armbrust <michael@databricks.com> | 2016-04-07 18:05:54 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-07 18:05:54 -0700 |
commit | 692c74840bc53debbb842db5372702f58207412c (patch) | |
tree | a586d61e5e438dc3371823bba3b66cea6c53f424 | |
parent | 3e29e372ff518827bae9dcd26087946fde476843 (diff) | |
download | spark-692c74840bc53debbb842db5372702f58207412c.tar.gz spark-692c74840bc53debbb842db5372702f58207412c.tar.bz2 spark-692c74840bc53debbb842db5372702f58207412c.zip |
[SPARK-14449][SQL] SparkContext should use SparkListenerInterface
Currently all `SparkFirehoseListener` implementations are broken since we expect listeners to extend `SparkListener`, while the fire hose only extends `SparkListenerInterface`. This changes the addListener function and the config based injection to use the interface instead.
The existing tests in SparkListenerSuite are improved such that they would have caught this.
Follow-up to #12142
Author: Michael Armbrust <michael@databricks.com>
Closes #12227 from marmbrus/fixListener.
4 files changed, 27 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c40fada64b..9ec5cedf25 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register a listener to receive up-calls from events that happen during execution. */ @DeveloperApi - def addSparkListener(listener: SparkListener) { + def addSparkListener(listener: SparkListenerInterface) { listenerBus.addListener(listener) } @@ -2007,7 +2007,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Use reflection to find the right constructor val constructors = { val listenerClass = Utils.classForName(className) - listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] + listenerClass + .getConstructors + .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]] } val constructorTakingSparkConf = constructors.find { c => c.getParameterTypes.sameElements(Array(classOf[SparkConf])) @@ -2015,7 +2017,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli lazy val zeroArgumentConstructor = constructors.find { c => c.getParameterTypes.isEmpty } - val listener: SparkListener = { + val listener: SparkListenerInterface = { if (constructorTakingSparkConf.isDefined) { constructorTakingSparkConf.get.newInstance(conf) } else if (zeroArgumentConstructor.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 94f0574f0e..471586ac08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -22,9 +22,12 @@ import org.apache.spark.util.ListenerBus /** * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners */ -private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] { +private[spark] trait SparkListenerBus + extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { - protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = { + protected override def doPostEvent( + listener: SparkListenerInterface, + event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 58d217ffef..b854d742b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.scalatest.Matchers -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.{ResetSystemProperties, RpcUtils} @@ -377,13 +377,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("registering listeners via spark.extraListeners") { + val listeners = Seq( + classOf[ListenerThatAcceptsSparkConf], + classOf[FirehoseListenerThatAcceptsSparkConf], + classOf[BasicJobCounter]) val conf = new SparkConf().setMaster("local").setAppName("test") - .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," + - classOf[BasicJobCounter].getName) + .set("spark.extraListeners", listeners.map(_.getName).mkString(",")) sc = new SparkContext(conf) sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1) sc.listenerBus.listeners.asScala .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1) + sc.listenerBus.listeners.asScala + .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1) } /** @@ -476,3 +481,11 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene var count = 0 override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 } + +private class FirehoseListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkFirehoseListener { + var count = 0 + override def onEvent(event: SparkListenerEvent): Unit = event match { + case job: SparkListenerJobEnd => count += 1 + case _ => + } +} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d916c49a6a..fbadc563b8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -68,6 +68,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"), // SPARK-14358 SparkListener from trait to abstract class + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.SparkContext.addSparkListener"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.JavaSparkListener"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkFirehoseListener"), ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.scheduler.SparkListener"), |