aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala167
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala190
-rw-r--r--core/src/main/scala/org/apache/spark/util/ListenerBus.scala14
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala69
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala2
14 files changed, 269 insertions, 231 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 77acb7052d..d7c605a583 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1644,9 +1644,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Shut down the SparkContext.
def stop() {
- if (AsynchronousListenerBus.withinListenerThread.value) {
- throw new SparkException("Cannot stop SparkContext within listener thread of" +
- " AsynchronousListenerBus")
+ if (LiveListenerBus.withinListenerThread.value) {
+ throw new SparkException(
+ s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index aa607c5a2d..36f2b74f94 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -200,7 +200,9 @@ private[spark] class EventLoggingListener(
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
override def onOtherEvent(event: SparkListenerEvent): Unit = {
- logEvent(event, flushLogger = true)
+ if (event.logEvent) {
+ logEvent(event, flushLogger = true)
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index be23056e7d..1c21313d1c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,24 +17,169 @@
package org.apache.spark.scheduler
+import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.spark.util.AsynchronousListenerBus
+import scala.util.DynamicVariable
+
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
*
- * Until start() is called, all posted events are only buffered. Only after this listener bus
+ * Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
- * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
+ * is stopped when `stop()` is called, and it will drop further events after stopping.
*/
-private[spark] class LiveListenerBus
- extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
- with SparkListenerBus {
+private[spark] class LiveListenerBus extends SparkListenerBus {
+
+ self =>
+
+ import LiveListenerBus._
+
+ private var sparkContext: SparkContext = null
+
+ // Cap the capacity of the event queue so we get an explicit error (rather than
+ // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+
+ // Indicate if `start()` is called
+ private val started = new AtomicBoolean(false)
+ // Indicate if `stop()` is called
+ private val stopped = new AtomicBoolean(false)
+
+ // Indicate if we are processing some event
+ // Guarded by `self`
+ private var processingEvent = false
private val logDroppedEvent = new AtomicBoolean(false)
- override def onDropEvent(event: SparkListenerEvent): Unit = {
+ // A counter that represents the number of events produced and consumed in the queue
+ private val eventLock = new Semaphore(0)
+
+ private val listenerThread = new Thread(name) {
+ setDaemon(true)
+ override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
+ LiveListenerBus.withinListenerThread.withValue(true) {
+ while (true) {
+ eventLock.acquire()
+ self.synchronized {
+ processingEvent = true
+ }
+ try {
+ val event = eventQueue.poll
+ if (event == null) {
+ // Get out of the while loop and shutdown the daemon thread
+ if (!stopped.get) {
+ throw new IllegalStateException("Polling `null` from eventQueue means" +
+ " the listener bus has been stopped. So `stopped` must be true")
+ }
+ return
+ }
+ postToAll(event)
+ } finally {
+ self.synchronized {
+ processingEvent = false
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Start sending events to attached listeners.
+ *
+ * This first sends out all buffered events posted before this listener bus has started, then
+ * listens for any additional events asynchronously while the listener bus is still running.
+ * This should only be called once.
+ *
+ * @param sc Used to stop the SparkContext in case the listener thread dies.
+ */
+ def start(sc: SparkContext): Unit = {
+ if (started.compareAndSet(false, true)) {
+ sparkContext = sc
+ listenerThread.start()
+ } else {
+ throw new IllegalStateException(s"$name already started!")
+ }
+ }
+
+ def post(event: SparkListenerEvent): Unit = {
+ if (stopped.get) {
+ // Drop further events to make `listenerThread` exit ASAP
+ logError(s"$name has already stopped! Dropping event $event")
+ return
+ }
+ val eventAdded = eventQueue.offer(event)
+ if (eventAdded) {
+ eventLock.release()
+ } else {
+ onDropEvent(event)
+ }
+ }
+
+ /**
+ * For testing only. Wait until there are no more events in the queue, or until the specified
+ * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
+ * emptied.
+ * Exposed for testing.
+ */
+ @throws(classOf[TimeoutException])
+ def waitUntilEmpty(timeoutMillis: Long): Unit = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!queueIsEmpty) {
+ if (System.currentTimeMillis > finishTime) {
+ throw new TimeoutException(
+ s"The event queue is not empty after $timeoutMillis milliseconds")
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and
+ * wait/notify add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ }
+
+ /**
+ * For testing only. Return whether the listener daemon thread is still alive.
+ * Exposed for testing.
+ */
+ def listenerThreadIsAlive: Boolean = listenerThread.isAlive
+
+ /**
+ * Return whether the event queue is empty.
+ *
+ * The use of synchronized here guarantees that all events that once belonged to this queue
+ * have already been processed by all attached listeners, if this returns true.
+ */
+ private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
+
+ /**
+ * Stop the listener bus. It will wait until the queued events have been processed, but drop the
+ * new events after stopping.
+ */
+ def stop(): Unit = {
+ if (!started.get()) {
+ throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
+ }
+ if (stopped.compareAndSet(false, true)) {
+ // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
+ // `stop` is called.
+ eventLock.release()
+ listenerThread.join()
+ } else {
+ // Keep quiet
+ }
+ }
+
+ /**
+ * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
+ * notified with the dropped events.
+ *
+ * Note: `onDropEvent` can be called in any thread.
+ */
+ def onDropEvent(event: SparkListenerEvent): Unit = {
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
@@ -42,5 +187,13 @@ private[spark] class LiveListenerBus
"the rate at which tasks are being started by the scheduler.")
}
}
+}
+
+private[spark] object LiveListenerBus {
+ // Allows for Context to check whether stop() call is made within listener thread
+ val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
+ /** The thread name of Spark listener bus */
+ val name = "SparkListenerBus"
}
+
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index f5267f58c2..6c6883d703 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -34,7 +34,10 @@ import org.apache.spark.util.{Distribution, Utils}
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
-trait SparkListenerEvent
+trait SparkListenerEvent {
+ /* Whether output this event to the event log */
+ protected[spark] def logEvent: Boolean = true
+}
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 95722a0714..94f0574f0e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -24,7 +24,7 @@ import org.apache.spark.util.ListenerBus
*/
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
- override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
+ protected override def doPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
deleted file mode 100644
index f6b7ea2f37..0000000000
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.util.DynamicVariable
-
-import org.apache.spark.SparkContext
-
-/**
- * Asynchronously passes events to registered listeners.
- *
- * Until `start()` is called, all posted events are only buffered. Only after this listener bus
- * has started will events be actually propagated to all attached listeners. This listener bus
- * is stopped when `stop()` is called, and it will drop further events after stopping.
- *
- * @param name name of the listener bus, will be the name of the listener thread.
- * @tparam L type of listener
- * @tparam E type of event
- */
-private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String)
- extends ListenerBus[L, E] {
-
- self =>
-
- private var sparkContext: SparkContext = null
-
- /* Cap the capacity of the event queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
-
- // Indicate if `start()` is called
- private val started = new AtomicBoolean(false)
- // Indicate if `stop()` is called
- private val stopped = new AtomicBoolean(false)
-
- // Indicate if we are processing some event
- // Guarded by `self`
- private var processingEvent = false
-
- // A counter that represents the number of events produced and consumed in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- AsynchronousListenerBus.withinListenerThread.withValue(true) {
- while (true) {
- eventLock.acquire()
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from eventQueue means" +
- " the listener bus has been stopped. So `stopped` must be true")
- }
- return
- }
- postToAll(event)
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
- }
- }
- }
- }
-
- /**
- * Start sending events to attached listeners.
- *
- * This first sends out all buffered events posted before this listener bus has started, then
- * listens for any additional events asynchronously while the listener bus is still running.
- * This should only be called once.
- *
- * @param sc Used to stop the SparkContext in case the listener thread dies.
- */
- def start(sc: SparkContext) {
- if (started.compareAndSet(false, true)) {
- sparkContext = sc
- listenerThread.start()
- } else {
- throw new IllegalStateException(s"$name already started!")
- }
- }
-
- def post(event: E) {
- if (stopped.get) {
- // Drop further events to make `listenerThread` exit ASAP
- logError(s"$name has already stopped! Dropping event $event")
- return
- }
- val eventAdded = eventQueue.offer(event)
- if (eventAdded) {
- eventLock.release()
- } else {
- onDropEvent(event)
- }
- }
-
- /**
- * For testing only. Wait until there are no more events in the queue, or until the specified
- * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
- * emptied.
- * Exposed for testing.
- */
- @throws(classOf[TimeoutException])
- def waitUntilEmpty(timeoutMillis: Long): Unit = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!queueIsEmpty) {
- if (System.currentTimeMillis > finishTime) {
- throw new TimeoutException(
- s"The event queue is not empty after $timeoutMillis milliseconds")
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and
- * wait/notify add overhead in the general case. */
- Thread.sleep(10)
- }
- }
-
- /**
- * For testing only. Return whether the listener daemon thread is still alive.
- * Exposed for testing.
- */
- def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
- /**
- * Return whether the event queue is empty.
- *
- * The use of synchronized here guarantees that all events that once belonged to this queue
- * have already been processed by all attached listeners, if this returns true.
- */
- private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
-
- /**
- * Stop the listener bus. It will wait until the queued events have been processed, but drop the
- * new events after stopping.
- */
- def stop() {
- if (!started.get()) {
- throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
- }
- if (stopped.compareAndSet(false, true)) {
- // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
- // `stop` is called.
- eventLock.release()
- listenerThread.join()
- } else {
- // Keep quiet
- }
- }
-
- /**
- * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
- * notified with the dropped events.
- *
- * Note: `onDropEvent` can be called in any thread.
- */
- def onDropEvent(event: E): Unit
-}
-
-private[spark] object AsynchronousListenerBus {
- /* Allows for Context to check whether stop() call is made within listener thread
- */
- val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
-}
-
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 13cb516b58..5e1fab009c 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -36,11 +36,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
- final def addListener(listener: L) {
+ final def addListener(listener: L): Unit = {
listeners.add(listener)
}
/**
+ * Remove a listener and it won't receive any events. This method is thread-safe and can be called
+ * in any thread.
+ */
+ final def removeListener(listener: L): Unit = {
+ listeners.remove(listener)
+ }
+
+ /**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
@@ -52,7 +60,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
while (iter.hasNext) {
val listener = iter.next()
try {
- onPostEvent(listener, event)
+ doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
@@ -64,7 +72,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread.
*/
- def onPostEvent(listener: L, event: E): Unit
+ protected def doPostEvent(listener: L, event: E): Unit
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4430bfd3b0..6469201446 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -153,6 +153,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
+ ) ++ Seq(
+ // SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus")
)
case v if v.startsWith("1.6") =>
Seq(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 157ee92fd7..b7070dda99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,6 +37,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.{RDD, RDDOperationScope}
+import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
@@ -44,7 +45,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -694,9 +695,9 @@ class StreamingContext private[streaming] (
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
- if (AsynchronousListenerBus.withinListenerThread.value) {
- throw new SparkException("Cannot stop StreamingContext within listener thread of" +
- " AsynchronousListenerBus")
+ if (LiveListenerBus.withinListenerThread.value) {
+ throw new SparkException(
+ s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}")
}
synchronized {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1ed6fb0aa9..9535c8e5b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -49,7 +49,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
- val listenerBus = new StreamingListenerBus()
+ val listenerBus = new StreamingListenerBus(ssc.sparkContext.listenerBus)
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
@@ -76,7 +76,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
- listenerBus.start(ssc.sparkContext)
+ listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index ca111bb636..39f6e711a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,19 +17,37 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
+import org.apache.spark.util.ListenerBus
-import org.apache.spark.Logging
-import org.apache.spark.util.AsynchronousListenerBus
+/**
+ * A Streaming listener bus to forward events to StreamingListeners. This one will wrap received
+ * Streaming events as WrappedStreamingListenerEvent and send them to Spark listener bus. It also
+ * registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents,
+ * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners.
+ */
+private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
+ extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
-/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus
- extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
- with Logging {
+ /**
+ * Post a StreamingListenerEvent to the Spark listener bus asynchronously. This event will be
+ * dispatched to all StreamingListeners in the thread of the Spark listener bus.
+ */
+ def post(event: StreamingListenerEvent) {
+ sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
+ }
- private val logDroppedEvent = new AtomicBoolean(false)
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case WrappedStreamingListenerEvent(e) =>
+ postToAll(e)
+ case _ =>
+ }
+ }
- override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+ protected override def doPostEvent(
+ listener: StreamingListener,
+ event: StreamingListenerEvent): Unit = {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listener.onReceiverStarted(receiverStarted)
@@ -51,12 +69,31 @@ private[spark] class StreamingListenerBus
}
}
- override def onDropEvent(event: StreamingListenerEvent): Unit = {
- if (logDroppedEvent.compareAndSet(false, true)) {
- // Only log the following message once to avoid duplicated annoying logs.
- logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
- "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
- "rate at which events are being started by the scheduler.")
- }
+ /**
+ * Register this one with the Spark listener bus so that it can receive Streaming events and
+ * forward them to StreamingListeners.
+ */
+ def start(): Unit = {
+ sparkListenerBus.addListener(this) // for getting callbacks on spark events
+ }
+
+ /**
+ * Unregister this one with the Spark listener bus and all StreamingListeners won't receive any
+ * events after that.
+ */
+ def stop(): Unit = {
+ sparkListenerBus.removeListener(this)
+ }
+
+ /**
+ * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
+ * listener bus.
+ */
+ private case class WrappedStreamingListenerEvent(streamingListenerEvent: StreamingListenerEvent)
+ extends SparkListenerEvent {
+
+ // Do not log streaming events in event log as history server does not support streaming
+ // events (SPARK-12140). TODO Once SPARK-12140 is resolved we should set it to true.
+ protected[spark] override def logEvent: Boolean = false
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 2e231601c3..75591f04ca 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -77,7 +77,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Ensure progress listener has been notified of all events
- ssc.scheduler.listenerBus.waitUntilEmpty(500)
+ ssc.sparkContext.listenerBus.waitUntilEmpty(500)
// Verify all "InputInfo"s have been reported
assert(ssc.progressListener.numTotalReceivedRecords === input.size)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 628a508207..1ed68c74db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, Synch
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
+import org.mockito.Mockito.{mock, reset, verifyNoMoreInteractions}
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -216,6 +217,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
assert(failureReasons(1).contains("This is another failed job"))
}
+ test("StreamingListener receives no events after stopping StreamingListenerBus") {
+ val streamingListener = mock(classOf[StreamingListener])
+
+ ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ ssc.addStreamingListener(streamingListener)
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+ ssc.start()
+ ssc.stop()
+
+ // Because "streamingListener" has already received some events, let's clear that.
+ reset(streamingListener)
+
+ // Post a Streaming event after stopping StreamingContext
+ val receiverInfoStopped = ReceiverInfo(0, "test", false, "localhost", "0")
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfoStopped))
+ ssc.sparkContext.listenerBus.waitUntilEmpty(1000)
+ // The StreamingListener should not receive any event
+ verifyNoMoreInteractions(streamingListener)
+ }
+
private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = {
val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc)
_ssc.addStreamingListener(contextStoppingCollector)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index b67189fbd7..cfd7f86f84 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase {
test("send rate update to receivers") {
withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
- ssc.scheduler.listenerBus.start(ssc.sc)
-
val newRateLimit = 100L
val inputDStream = new RateTestInputDStream(ssc)
val tracker = new ReceiverTracker(ssc)