aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-04-07 18:05:54 -0700
committerReynold Xin <rxin@databricks.com>2016-04-07 18:05:54 -0700
commit692c74840bc53debbb842db5372702f58207412c (patch)
treea586d61e5e438dc3371823bba3b66cea6c53f424 /core
parent3e29e372ff518827bae9dcd26087946fde476843 (diff)
downloadspark-692c74840bc53debbb842db5372702f58207412c.tar.gz
spark-692c74840bc53debbb842db5372702f58207412c.tar.bz2
spark-692c74840bc53debbb842db5372702f58207412c.zip
[SPARK-14449][SQL] SparkContext should use SparkListenerInterface
Currently all `SparkFirehoseListener` implementations are broken since we expect listeners to extend `SparkListener`, while the fire hose only extends `SparkListenerInterface`. This changes the addListener function and the config based injection to use the interface instead. The existing tests in SparkListenerSuite are improved such that they would have caught this. Follow-up to #12142 Author: Michael Armbrust <michael@databricks.com> Closes #12227 from marmbrus/fixListener.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala19
3 files changed, 26 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c40fada64b..9ec5cedf25 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1356,7 +1356,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
- def addSparkListener(listener: SparkListener) {
+ def addSparkListener(listener: SparkListenerInterface) {
listenerBus.addListener(listener)
}
@@ -2007,7 +2007,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use reflection to find the right constructor
val constructors = {
val listenerClass = Utils.classForName(className)
- listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
+ listenerClass
+ .getConstructors
+ .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
@@ -2015,7 +2017,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
- val listener: SparkListener = {
+ val listener: SparkListenerInterface = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 94f0574f0e..471586ac08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -22,9 +22,12 @@ import org.apache.spark.util.ListenerBus
/**
* A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
-private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
+private[spark] trait SparkListenerBus
+ extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
- protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
+ protected override def doPostEvent(
+ listener: SparkListenerInterface,
+ event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 58d217ffef..b854d742b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.scalatest.Matchers
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@@ -377,13 +377,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("registering listeners via spark.extraListeners") {
+ val listeners = Seq(
+ classOf[ListenerThatAcceptsSparkConf],
+ classOf[FirehoseListenerThatAcceptsSparkConf],
+ classOf[BasicJobCounter])
val conf = new SparkConf().setMaster("local").setAppName("test")
- .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
- classOf[BasicJobCounter].getName)
+ .set("spark.extraListeners", listeners.map(_.getName).mkString(","))
sc = new SparkContext(conf)
sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1)
sc.listenerBus.listeners.asScala
.count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1)
+ sc.listenerBus.listeners.asScala
+ .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1)
}
/**
@@ -476,3 +481,11 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}
+
+private class FirehoseListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkFirehoseListener {
+ var count = 0
+ override def onEvent(event: SparkListenerEvent): Unit = event match {
+ case job: SparkListenerJobEnd => count += 1
+ case _ =>
+ }
+}