From 692c74840bc53debbb842db5372702f58207412c Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 7 Apr 2016 18:05:54 -0700 Subject: [SPARK-14449][SQL] SparkContext should use SparkListenerInterface Currently all `SparkFirehoseListener` implementations are broken since we expect listeners to extend `SparkListener`, while the fire hose only extends `SparkListenerInterface`. This changes the addListener function and the config based injection to use the interface instead. The existing tests in SparkListenerSuite are improved such that they would have caught this. Follow-up to #12142 Author: Michael Armbrust Closes #12227 from marmbrus/fixListener. --- .../main/scala/org/apache/spark/SparkContext.scala | 8 +++++--- .../org/apache/spark/scheduler/SparkListenerBus.scala | 7 +++++-- .../apache/spark/scheduler/SparkListenerSuite.scala | 19 ++++++++++++++++--- project/MimaExcludes.scala | 1 + 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c40fada64b..9ec5cedf25 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Register a listener to receive up-calls from events that happen during execution. */ @DeveloperApi - def addSparkListener(listener: SparkListener) { + def addSparkListener(listener: SparkListenerInterface) { listenerBus.addListener(listener) } @@ -2007,7 +2007,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Use reflection to find the right constructor val constructors = { val listenerClass = Utils.classForName(className) - listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] + listenerClass + .getConstructors + .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]] } val constructorTakingSparkConf = constructors.find { c => c.getParameterTypes.sameElements(Array(classOf[SparkConf])) @@ -2015,7 +2017,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli lazy val zeroArgumentConstructor = constructors.find { c => c.getParameterTypes.isEmpty } - val listener: SparkListener = { + val listener: SparkListenerInterface = { if (constructorTakingSparkConf.isDefined) { constructorTakingSparkConf.get.newInstance(conf) } else if (zeroArgumentConstructor.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 94f0574f0e..471586ac08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -22,9 +22,12 @@ import org.apache.spark.util.ListenerBus /** * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners */ -private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] { +private[spark] trait SparkListenerBus + extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { - protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = { + protected override def doPostEvent( + listener: SparkListenerInterface, + event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 58d217ffef..b854d742b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.scalatest.Matchers -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.{ResetSystemProperties, RpcUtils} @@ -377,13 +377,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("registering listeners via spark.extraListeners") { + val listeners = Seq( + classOf[ListenerThatAcceptsSparkConf], + classOf[FirehoseListenerThatAcceptsSparkConf], + classOf[BasicJobCounter]) val conf = new SparkConf().setMaster("local").setAppName("test") - .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," + - classOf[BasicJobCounter].getName) + .set("spark.extraListeners", listeners.map(_.getName).mkString(",")) sc = new SparkContext(conf) sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1) sc.listenerBus.listeners.asScala .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1) + sc.listenerBus.listeners.asScala + .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1) } /** @@ -476,3 +481,11 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene var count = 0 override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 } + +private class FirehoseListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkFirehoseListener { + var count = 0 + override def onEvent(event: SparkListenerEvent): Unit = event match { + case job: SparkListenerJobEnd => count += 1 + case _ => + } +} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d916c49a6a..fbadc563b8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -68,6 +68,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"), // SPARK-14358 SparkListener from trait to abstract class + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.SparkContext.addSparkListener"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.JavaSparkListener"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkFirehoseListener"), ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.scheduler.SparkListener"), -- cgit v1.2.3