From 944fdadf77523570f6b33544ad0b388031498952 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Jan 2016 11:57:53 -0800 Subject: [SPARK-12847][CORE][STREAMING] Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events Including the following changes: 1. Add StreamingListenerForwardingBus to WrappedStreamingListenerEvent process events in `onOtherEvent` to StreamingListener 2. Remove StreamingListenerBus 3. Merge AsynchronousListenerBus and LiveListenerBus to the same class LiveListenerBus 4. Add `logEvent` method to SparkListenerEvent so that EventLoggingListener can use it to ignore WrappedStreamingListenerEvents Author: Shixiong Zhu Closes #10779 from zsxwing/streaming-listener. --- .../apache/spark/streaming/StreamingContext.scala | 9 +-- .../spark/streaming/scheduler/JobScheduler.scala | 4 +- .../streaming/scheduler/StreamingListenerBus.scala | 69 +++++++++++++++++----- .../apache/spark/streaming/InputStreamsSuite.scala | 2 +- .../spark/streaming/StreamingListenerSuite.scala | 22 +++++++ .../streaming/scheduler/ReceiverTrackerSuite.scala | 2 - 6 files changed, 83 insertions(+), 25 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 157ee92fd7..b7070dda99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -37,6 +37,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.rdd.{RDD, RDDOperationScope} +import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.SerializationDebugger import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ @@ -44,7 +45,7 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} -import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils} +import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils} /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -694,9 +695,9 @@ class StreamingContext private[streaming] ( */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null - if (AsynchronousListenerBus.withinListenerThread.value) { - throw new SparkException("Cannot stop StreamingContext within listener thread of" + - " AsynchronousListenerBus") + if (LiveListenerBus.withinListenerThread.value) { + throw new SparkException( + s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}") } synchronized { // The state should always be Stopped after calling `stop()`, even if we haven't started yet diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 1ed6fb0aa9..9535c8e5b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -49,7 +49,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock - val listenerBus = new StreamingListenerBus() + val listenerBus = new StreamingListenerBus(ssc.sparkContext.listenerBus) // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped @@ -76,7 +76,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) - listenerBus.start(ssc.sparkContext) + listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index ca111bb636..39f6e711a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -17,19 +17,37 @@ package org.apache.spark.streaming.scheduler -import java.util.concurrent.atomic.AtomicBoolean +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} +import org.apache.spark.util.ListenerBus -import org.apache.spark.Logging -import org.apache.spark.util.AsynchronousListenerBus +/** + * A Streaming listener bus to forward events to StreamingListeners. This one will wrap received + * Streaming events as WrappedStreamingListenerEvent and send them to Spark listener bus. It also + * registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents, + * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners. + */ +private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) + extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] { -/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ -private[spark] class StreamingListenerBus - extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus") - with Logging { + /** + * Post a StreamingListenerEvent to the Spark listener bus asynchronously. This event will be + * dispatched to all StreamingListeners in the thread of the Spark listener bus. + */ + def post(event: StreamingListenerEvent) { + sparkListenerBus.post(new WrappedStreamingListenerEvent(event)) + } - private val logDroppedEvent = new AtomicBoolean(false) + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case WrappedStreamingListenerEvent(e) => + postToAll(e) + case _ => + } + } - override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = { + protected override def doPostEvent( + listener: StreamingListener, + event: StreamingListenerEvent): Unit = { event match { case receiverStarted: StreamingListenerReceiverStarted => listener.onReceiverStarted(receiverStarted) @@ -51,12 +69,31 @@ private[spark] class StreamingListenerBus } } - override def onDropEvent(event: StreamingListenerEvent): Unit = { - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError("Dropping StreamingListenerEvent because no remaining room in event queue. " + - "This likely means one of the StreamingListeners is too slow and cannot keep up with the " + - "rate at which events are being started by the scheduler.") - } + /** + * Register this one with the Spark listener bus so that it can receive Streaming events and + * forward them to StreamingListeners. + */ + def start(): Unit = { + sparkListenerBus.addListener(this) // for getting callbacks on spark events + } + + /** + * Unregister this one with the Spark listener bus and all StreamingListeners won't receive any + * events after that. + */ + def stop(): Unit = { + sparkListenerBus.removeListener(this) + } + + /** + * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark + * listener bus. + */ + private case class WrappedStreamingListenerEvent(streamingListenerEvent: StreamingListenerEvent) + extends SparkListenerEvent { + + // Do not log streaming events in event log as history server does not support streaming + // events (SPARK-12140). TODO Once SPARK-12140 is resolved we should set it to true. + protected[spark] override def logEvent: Boolean = false } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 2e231601c3..75591f04ca 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -77,7 +77,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Ensure progress listener has been notified of all events - ssc.scheduler.listenerBus.waitUntilEmpty(500) + ssc.sparkContext.listenerBus.waitUntilEmpty(500) // Verify all "InputInfo"s have been reported assert(ssc.progressListener.numTotalReceivedRecords === input.size) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 628a508207..1ed68c74db 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, Synch import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import org.mockito.Mockito.{mock, reset, verifyNoMoreInteractions} import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -216,6 +217,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { assert(failureReasons(1).contains("This is another failed job")) } + test("StreamingListener receives no events after stopping StreamingListenerBus") { + val streamingListener = mock(classOf[StreamingListener]) + + ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + ssc.addStreamingListener(streamingListener) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD(_.count) + ssc.start() + ssc.stop() + + // Because "streamingListener" has already received some events, let's clear that. + reset(streamingListener) + + // Post a Streaming event after stopping StreamingContext + val receiverInfoStopped = ReceiverInfo(0, "test", false, "localhost", "0") + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfoStopped)) + ssc.sparkContext.listenerBus.waitUntilEmpty(1000) + // The StreamingListener should not receive any event + verifyNoMoreInteractions(streamingListener) + } + private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = { val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc) _ssc.addStreamingListener(contextStoppingCollector) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index b67189fbd7..cfd7f86f84 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase { test("send rate update to receivers") { withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => - ssc.scheduler.listenerBus.start(ssc.sc) - val newRateLimit = 100L val inputDStream = new RateTestInputDStream(ssc) val tracker = new ReceiverTracker(ssc) -- cgit v1.2.3