aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkContext.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala57
1 files changed, 54 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7f5aef1c75..a7adddb6c8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import scala.language.implicitConversions
import java.io._
+import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
@@ -387,9 +388,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
executorAllocationManager.foreach(_.start())
- // At this point, all relevant SparkListeners have been registered, so begin releasing events
- listenerBus.start()
-
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
@@ -399,6 +397,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
cleaner.foreach(_.start())
+ setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
@@ -1563,6 +1562,58 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
+ /**
+ * Registers listeners specified in spark.extraListeners, then starts the listener bus.
+ * This should be called after all internal listeners have been registered with the listener bus
+ * (e.g. after the web UI and event logging listeners have been registered).
+ */
+ private def setupAndStartListenerBus(): Unit = {
+ // Use reflection to instantiate listeners specified via `spark.extraListeners`
+ try {
+ val listenerClassNames: Seq[String] =
+ conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
+ for (className <- listenerClassNames) {
+ // Use reflection to find the right constructor
+ val constructors = {
+ val listenerClass = Class.forName(className)
+ listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
+ }
+ val constructorTakingSparkConf = constructors.find { c =>
+ c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
+ }
+ lazy val zeroArgumentConstructor = constructors.find { c =>
+ c.getParameterTypes.isEmpty
+ }
+ val listener: SparkListener = {
+ if (constructorTakingSparkConf.isDefined) {
+ constructorTakingSparkConf.get.newInstance(conf)
+ } else if (zeroArgumentConstructor.isDefined) {
+ zeroArgumentConstructor.get.newInstance()
+ } else {
+ throw new SparkException(
+ s"$className did not have a zero-argument constructor or a" +
+ " single-argument constructor that accepts SparkConf. Note: if the class is" +
+ " defined inside of another Scala class, then its constructors may accept an" +
+ " implicit parameter that references the enclosing class; in this case, you must" +
+ " define the listener as a top-level class in order to prevent this extra" +
+ " parameter from breaking Spark's ability to find a valid constructor.")
+ }
+ }
+ listenerBus.addListener(listener)
+ logInfo(s"Registered listener $className")
+ }
+ } catch {
+ case e: Exception =>
+ try {
+ stop()
+ } finally {
+ throw new SparkException(s"Exception when registering SparkListener", e)
+ }
+ }
+
+ listenerBus.start()
+ }
+
/** Post the application start event */
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted